security.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import logging
  22. import time
  23. from typing import Optional
  24. from telethon.tl.functions.messages import GetFullChatRequest
  25. from telethon.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  26. from . import main, utils
  27. logger = logging.getLogger(__name__)
  28. OWNER = 1 << 0
  29. SUDO = 1 << 1
  30. SUPPORT = 1 << 2
  31. GROUP_OWNER = 1 << 3
  32. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  33. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  34. GROUP_ADMIN_BAN_USERS = 1 << 6
  35. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  36. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  37. GROUP_ADMIN_INVITE_USERS = 1 << 9
  38. GROUP_ADMIN = 1 << 10
  39. GROUP_MEMBER = 1 << 11
  40. PM = 1 << 12
  41. EVERYONE = 1 << 13
  42. BITMAP = {
  43. "OWNER": OWNER,
  44. "SUDO": SUDO,
  45. "SUPPORT": SUPPORT,
  46. "GROUP_OWNER": GROUP_OWNER,
  47. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  48. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  49. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  50. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  51. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  52. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  53. "GROUP_ADMIN": GROUP_ADMIN,
  54. "GROUP_MEMBER": GROUP_MEMBER,
  55. "PM": PM,
  56. "EVERYONE": EVERYONE,
  57. }
  58. GROUP_ADMIN_ANY = (
  59. GROUP_ADMIN_ADD_ADMINS
  60. | GROUP_ADMIN_CHANGE_INFO
  61. | GROUP_ADMIN_BAN_USERS
  62. | GROUP_ADMIN_DELETE_MESSAGES
  63. | GROUP_ADMIN_PIN_MESSAGES
  64. | GROUP_ADMIN_INVITE_USERS
  65. | GROUP_ADMIN
  66. )
  67. DEFAULT_PERMISSIONS = OWNER | SUDO
  68. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  69. ALL = (1 << 13) - 1
  70. def owner(func: callable) -> callable:
  71. return _sec(func, OWNER)
  72. def sudo(func: callable) -> callable:
  73. return _sec(func, SUDO)
  74. def support(func: callable) -> callable:
  75. return _sec(func, SUDO | SUPPORT)
  76. def group_owner(func: callable) -> callable:
  77. return _sec(func, SUDO | GROUP_OWNER)
  78. def group_admin_add_admins(func: callable) -> callable:
  79. return _sec(func, SUDO | GROUP_ADMIN_ADD_ADMINS)
  80. def group_admin_change_info(func: callable) -> callable:
  81. return _sec(func, SUDO | GROUP_ADMIN_CHANGE_INFO)
  82. def group_admin_ban_users(func: callable) -> callable:
  83. return _sec(func, SUDO | GROUP_ADMIN_BAN_USERS)
  84. def group_admin_delete_messages(func: callable) -> callable:
  85. return _sec(func, SUDO | GROUP_ADMIN_DELETE_MESSAGES)
  86. def group_admin_pin_messages(func: callable) -> callable:
  87. return _sec(func, SUDO | GROUP_ADMIN_PIN_MESSAGES)
  88. def group_admin_invite_users(func: callable) -> callable:
  89. return _sec(func, SUDO | GROUP_ADMIN_INVITE_USERS)
  90. def group_admin(func: callable) -> callable:
  91. return _sec(func, SUDO | GROUP_ADMIN)
  92. def group_member(func: callable) -> callable:
  93. return _sec(func, SUDO | GROUP_MEMBER)
  94. def pm(func: callable) -> callable:
  95. return _sec(func, SUDO | PM)
  96. def unrestricted(func: callable) -> callable:
  97. return _sec(func, ALL)
  98. def inline_everyone(func: callable) -> callable:
  99. return _sec(func, EVERYONE)
  100. def _sec(func: callable, flags: int) -> callable:
  101. prev = getattr(func, "security", 0)
  102. func.security = prev | OWNER | flags
  103. return func
  104. class SecurityManager:
  105. def __init__(self, db):
  106. self._any_admin = db.get(__name__, "any_admin", False)
  107. self._default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  108. self._db = db
  109. self._reload_rights()
  110. self._cache = {}
  111. def _reload_rights(self):
  112. self._owner = list(
  113. set(
  114. self._db.get(__name__, "owner", []).copy()
  115. + ([self._client.tg_id] if hasattr(self, "_client") else [])
  116. )
  117. )
  118. self._sudo = list(set(self._db.get(__name__, "sudo", []).copy()))
  119. self._support = list(set(self._db.get(__name__, "support", []).copy()))
  120. async def init(self, client):
  121. self._client = client
  122. def get_flags(self, func: callable) -> int:
  123. if isinstance(func, int):
  124. config = func
  125. else:
  126. # Return masks there so user don't need to reboot
  127. # every time he changes permissions. It doesn't
  128. # decrease security at all, bc user anyway can
  129. # access this attribute
  130. config = self._db.get(__name__, "masks", {}).get(
  131. f"{func.__module__}.{func.__name__}",
  132. getattr(func, "security", self._default),
  133. )
  134. if config & ~ALL and not config & EVERYONE:
  135. logger.error("Security config contains unknown bits")
  136. return False
  137. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  138. async def _check(
  139. self,
  140. message: Message,
  141. func: callable,
  142. user: Optional[int] = None,
  143. ) -> bool:
  144. """Checks if message sender is permitted to execute certain function"""
  145. self._reload_rights()
  146. if not (config := self.get_flags(func)):
  147. return False
  148. if not user:
  149. user = message.sender_id
  150. if user == self._client.tg_id or getattr(message, "out", False):
  151. return True
  152. logger.debug(f"Checking security match for {config}")
  153. f_owner = config & OWNER
  154. f_sudo = config & SUDO
  155. f_support = config & SUPPORT
  156. f_group_owner = config & GROUP_OWNER
  157. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  158. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  159. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  160. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  161. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  162. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  163. f_group_admin = config & GROUP_ADMIN
  164. f_group_member = config & GROUP_MEMBER
  165. f_pm = config & PM
  166. f_group_admin_any = (
  167. f_group_admin_add_admins
  168. or f_group_admin_change_info
  169. or f_group_admin_ban_users
  170. or f_group_admin_delete_messages
  171. or f_group_admin_pin_messages
  172. or f_group_admin_invite_users
  173. or f_group_admin
  174. )
  175. if (
  176. f_owner
  177. and user in self._owner
  178. or f_sudo
  179. and user in self._sudo
  180. or f_support
  181. and user in self._support
  182. ):
  183. return True
  184. if user in self._db.get(main.__name__, "blacklist_users", []):
  185. return False
  186. if message is None: # In case of checking inline query security map
  187. return bool(config & EVERYONE)
  188. if f_group_member and message.is_group or f_pm and message.is_private:
  189. return True
  190. if message.is_channel:
  191. if not message.is_group:
  192. if message.edit_date:
  193. return False
  194. chat_id = utils.get_chat_id(message)
  195. if (
  196. chat_id in self._cache
  197. and self._cache[chat_id]["exp"] >= time.time()
  198. ):
  199. chat = self._cache[chat_id]["chat"]
  200. else:
  201. chat = await message.get_chat()
  202. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  203. if (
  204. not chat.creator
  205. and not chat.admin_rights
  206. or not chat.creator
  207. and not chat.admin_rights.post_messages
  208. ):
  209. return False
  210. if self._any_admin and f_group_admin_any or f_group_admin:
  211. return True
  212. elif f_group_admin_any or f_group_owner:
  213. chat_id = utils.get_chat_id(message)
  214. cache_obj = f"{chat_id}/{user}"
  215. if (
  216. cache_obj in self._cache
  217. and self._cache[cache_obj]["exp"] >= time.time()
  218. ):
  219. participant = self._cache[cache_obj]["user"]
  220. else:
  221. participant = await message.client.get_permissions(
  222. message.peer_id,
  223. user,
  224. )
  225. self._cache[cache_obj] = {
  226. "user": participant,
  227. "exp": time.time() + 5 * 60,
  228. }
  229. if (
  230. participant.is_creator
  231. or participant.is_admin
  232. and (
  233. self._any_admin
  234. and f_group_admin_any
  235. or f_group_admin
  236. or f_group_admin_add_admins
  237. and participant.add_admins
  238. or f_group_admin_change_info
  239. and participant.change_info
  240. or f_group_admin_ban_users
  241. and participant.ban_users
  242. or f_group_admin_delete_messages
  243. and participant.delete_messages
  244. or f_group_admin_pin_messages
  245. and participant.pin_messages
  246. or f_group_admin_invite_users
  247. and participant.invite_users
  248. )
  249. ):
  250. return True
  251. return False
  252. if message.is_group and (f_group_admin_any or f_group_owner):
  253. chat_id = utils.get_chat_id(message)
  254. cache_obj = f"{chat_id}/{user}"
  255. if (
  256. cache_obj in self._cache
  257. and self._cache[cache_obj]["exp"] >= time.time()
  258. ):
  259. participant = self._cache[cache_obj]["user"]
  260. else:
  261. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  262. participants = full_chat.full_chat.participants.participants
  263. participant = next(
  264. (
  265. possible_participant
  266. for possible_participant in participants
  267. if possible_participant.user_id == message.sender_id
  268. ),
  269. None,
  270. )
  271. self._cache[cache_obj] = {
  272. "user": participant,
  273. "exp": time.time() + 5 * 60,
  274. }
  275. if not participant:
  276. return
  277. if (
  278. isinstance(participant, ChatParticipantCreator)
  279. or isinstance(participant, ChatParticipantAdmin)
  280. and f_group_admin_any
  281. ):
  282. return True
  283. return False
  284. check = _check