security.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import logging
  22. import time
  23. from typing import Optional
  24. from telethon.tl.functions.messages import GetFullChatRequest
  25. from telethon.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  26. from . import main, utils
  27. logger = logging.getLogger(__name__)
  28. OWNER = 1 << 0
  29. SUDO = 1 << 1
  30. SUPPORT = 1 << 2
  31. GROUP_OWNER = 1 << 3
  32. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  33. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  34. GROUP_ADMIN_BAN_USERS = 1 << 6
  35. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  36. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  37. GROUP_ADMIN_INVITE_USERS = 1 << 9
  38. GROUP_ADMIN = 1 << 10
  39. GROUP_MEMBER = 1 << 11
  40. PM = 1 << 12
  41. EVERYONE = 1 << 13
  42. BITMAP = {
  43. "OWNER": OWNER,
  44. "SUDO": SUDO,
  45. "SUPPORT": SUPPORT,
  46. "GROUP_OWNER": GROUP_OWNER,
  47. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  48. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  49. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  50. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  51. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  52. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  53. "GROUP_ADMIN": GROUP_ADMIN,
  54. "GROUP_MEMBER": GROUP_MEMBER,
  55. "PM": PM,
  56. "EVERYONE": EVERYONE,
  57. }
  58. GROUP_ADMIN_ANY = (
  59. GROUP_ADMIN_ADD_ADMINS
  60. | GROUP_ADMIN_CHANGE_INFO
  61. | GROUP_ADMIN_BAN_USERS
  62. | GROUP_ADMIN_DELETE_MESSAGES
  63. | GROUP_ADMIN_PIN_MESSAGES
  64. | GROUP_ADMIN_INVITE_USERS
  65. | GROUP_ADMIN
  66. )
  67. DEFAULT_PERMISSIONS = OWNER | SUDO
  68. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  69. ALL = (1 << 13) - 1
  70. def owner(func: callable) -> callable:
  71. return _sec(func, OWNER)
  72. def sudo(func: callable) -> callable:
  73. return _sec(func, SUDO)
  74. def support(func: callable) -> callable:
  75. return _sec(func, SUDO | SUPPORT)
  76. def group_owner(func: callable) -> callable:
  77. return _sec(func, SUDO | GROUP_OWNER)
  78. def group_admin_add_admins(func: callable) -> callable:
  79. return _sec(func, SUDO | GROUP_ADMIN_ADD_ADMINS)
  80. def group_admin_change_info(func: callable) -> callable:
  81. return _sec(func, SUDO | GROUP_ADMIN_CHANGE_INFO)
  82. def group_admin_ban_users(func: callable) -> callable:
  83. return _sec(func, SUDO | GROUP_ADMIN_BAN_USERS)
  84. def group_admin_delete_messages(func: callable) -> callable:
  85. return _sec(func, SUDO | GROUP_ADMIN_DELETE_MESSAGES)
  86. def group_admin_pin_messages(func: callable) -> callable:
  87. return _sec(func, SUDO | GROUP_ADMIN_PIN_MESSAGES)
  88. def group_admin_invite_users(func: callable) -> callable:
  89. return _sec(func, SUDO | GROUP_ADMIN_INVITE_USERS)
  90. def group_admin(func: callable) -> callable:
  91. return _sec(func, SUDO | GROUP_ADMIN)
  92. def group_member(func: callable) -> callable:
  93. return _sec(func, SUDO | GROUP_MEMBER)
  94. def pm(func: callable) -> callable:
  95. return _sec(func, SUDO | PM)
  96. def unrestricted(func: callable) -> callable:
  97. return _sec(func, ALL)
  98. def inline_everyone(func: callable) -> callable:
  99. return _sec(func, EVERYONE)
  100. def _sec(func: callable, flags: int) -> callable:
  101. prev = getattr(func, "security", 0)
  102. func.security = prev | OWNER | flags
  103. return func
  104. class SecurityManager:
  105. def __init__(self, db):
  106. self._any_admin = db.get(__name__, "any_admin", False)
  107. self._default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  108. self._db = db
  109. self._reload_rights()
  110. self._cache = {}
  111. def _reload_rights(self):
  112. self._owner = list(
  113. set(
  114. self._db.get(__name__, "owner", []).copy()
  115. + ([self._me] if hasattr(self, "_me") else [])
  116. )
  117. )
  118. self._sudo = list(set(self._db.get(__name__, "sudo", []).copy()))
  119. self._support = list(set(self._db.get(__name__, "support", []).copy()))
  120. async def init(self, client):
  121. self._client = client
  122. self._me = client._tg_id
  123. def get_flags(self, func: callable) -> int:
  124. if isinstance(func, int):
  125. config = func
  126. else:
  127. # Return masks there so user don't need to reboot
  128. # every time he changes permissions. It doesn't
  129. # decrease security at all, bc user anyway can
  130. # access this attribute
  131. config = self._db.get(__name__, "masks", {}).get(
  132. f"{func.__module__}.{func.__name__}",
  133. getattr(func, "security", self._default),
  134. )
  135. if config & ~ALL and not config & EVERYONE:
  136. logger.error("Security config contains unknown bits")
  137. return False
  138. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  139. async def _check(
  140. self,
  141. message: Message,
  142. func: callable,
  143. user: Optional[int] = None,
  144. ) -> bool:
  145. """Checks if message sender is permitted to execute certain function"""
  146. self._reload_rights()
  147. if not (config := self.get_flags(func)):
  148. return False
  149. if not user:
  150. user = message.sender_id
  151. if user == self._me or getattr(message, "out", False):
  152. return True
  153. logger.debug(f"Checking security match for {config}")
  154. f_owner = config & OWNER
  155. f_sudo = config & SUDO
  156. f_support = config & SUPPORT
  157. f_group_owner = config & GROUP_OWNER
  158. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  159. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  160. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  161. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  162. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  163. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  164. f_group_admin = config & GROUP_ADMIN
  165. f_group_member = config & GROUP_MEMBER
  166. f_pm = config & PM
  167. f_group_admin_any = (
  168. f_group_admin_add_admins
  169. or f_group_admin_change_info
  170. or f_group_admin_ban_users
  171. or f_group_admin_delete_messages
  172. or f_group_admin_pin_messages
  173. or f_group_admin_invite_users
  174. or f_group_admin
  175. )
  176. if (
  177. f_owner
  178. and user in self._owner
  179. or f_sudo
  180. and user in self._sudo
  181. or f_support
  182. and user in self._support
  183. ):
  184. return True
  185. if user in self._db.get(main.__name__, "blacklist_users", []):
  186. return False
  187. if message is None: # In case of checking inline query security map
  188. return bool(config & EVERYONE)
  189. if f_group_member and message.is_group or f_pm and message.is_private:
  190. return True
  191. if message.is_channel:
  192. if not message.is_group:
  193. if message.edit_date:
  194. return False
  195. chat_id = utils.get_chat_id(message)
  196. if (
  197. chat_id in self._cache
  198. and self._cache[chat_id]["exp"] >= time.time()
  199. ):
  200. chat = self._cache[chat_id]["chat"]
  201. else:
  202. chat = await message.get_chat()
  203. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  204. if (
  205. not chat.creator
  206. and not chat.admin_rights
  207. or not chat.creator
  208. and not chat.admin_rights.post_messages
  209. ):
  210. return False
  211. if self._any_admin and f_group_admin_any or f_group_admin:
  212. return True
  213. elif f_group_admin_any or f_group_owner:
  214. chat_id = utils.get_chat_id(message)
  215. cache_obj = f"{chat_id}/{user}"
  216. if (
  217. cache_obj in self._cache
  218. and self._cache[cache_obj]["exp"] >= time.time()
  219. ):
  220. participant = self._cache[cache_obj]["user"]
  221. else:
  222. participant = await message.client.get_permissions(
  223. message.peer_id,
  224. user,
  225. )
  226. self._cache[cache_obj] = {
  227. "user": participant,
  228. "exp": time.time() + 5 * 60,
  229. }
  230. if (
  231. participant.is_creator
  232. or participant.is_admin
  233. and (
  234. self._any_admin
  235. and f_group_admin_any
  236. or f_group_admin
  237. or f_group_admin_add_admins
  238. and participant.add_admins
  239. or f_group_admin_change_info
  240. and participant.change_info
  241. or f_group_admin_ban_users
  242. and participant.ban_users
  243. or f_group_admin_delete_messages
  244. and participant.delete_messages
  245. or f_group_admin_pin_messages
  246. and participant.pin_messages
  247. or f_group_admin_invite_users
  248. and participant.invite_users
  249. )
  250. ):
  251. return True
  252. return False
  253. if message.is_group and (f_group_admin_any or f_group_owner):
  254. chat_id = utils.get_chat_id(message)
  255. cache_obj = f"{chat_id}/{user}"
  256. if (
  257. cache_obj in self._cache
  258. and self._cache[cache_obj]["exp"] >= time.time()
  259. ):
  260. participant = self._cache[cache_obj]["user"]
  261. else:
  262. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  263. participants = full_chat.full_chat.participants.participants
  264. participant = next(
  265. (
  266. possible_participant
  267. for possible_participant in participants
  268. if possible_participant.user_id == message.sender_id
  269. ),
  270. None,
  271. )
  272. self._cache[cache_obj] = {
  273. "user": participant,
  274. "exp": time.time() + 5 * 60,
  275. }
  276. if not participant:
  277. return
  278. if (
  279. isinstance(participant, ChatParticipantCreator)
  280. or isinstance(participant, ChatParticipantAdmin)
  281. and f_group_admin_any
  282. ):
  283. return True
  284. return False
  285. check = _check