security.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import logging
  22. import time
  23. import typing
  24. from telethon.hints import EntityLike
  25. from telethon.utils import get_display_name
  26. from telethon.tl.functions.messages import GetFullChatRequest
  27. from telethon.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  28. from . import main, utils
  29. from .database import Database
  30. from .tl_cache import CustomTelegramClient
  31. logger = logging.getLogger(__name__)
  32. OWNER = 1 << 0
  33. SUDO = 1 << 1
  34. SUPPORT = 1 << 2
  35. GROUP_OWNER = 1 << 3
  36. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  37. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  38. GROUP_ADMIN_BAN_USERS = 1 << 6
  39. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  40. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  41. GROUP_ADMIN_INVITE_USERS = 1 << 9
  42. GROUP_ADMIN = 1 << 10
  43. GROUP_MEMBER = 1 << 11
  44. PM = 1 << 12
  45. EVERYONE = 1 << 13
  46. BITMAP = {
  47. "OWNER": OWNER,
  48. "SUDO": SUDO,
  49. "SUPPORT": SUPPORT,
  50. "GROUP_OWNER": GROUP_OWNER,
  51. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  52. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  53. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  54. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  55. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  56. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  57. "GROUP_ADMIN": GROUP_ADMIN,
  58. "GROUP_MEMBER": GROUP_MEMBER,
  59. "PM": PM,
  60. "EVERYONE": EVERYONE,
  61. }
  62. GROUP_ADMIN_ANY = (
  63. GROUP_ADMIN_ADD_ADMINS
  64. | GROUP_ADMIN_CHANGE_INFO
  65. | GROUP_ADMIN_BAN_USERS
  66. | GROUP_ADMIN_DELETE_MESSAGES
  67. | GROUP_ADMIN_PIN_MESSAGES
  68. | GROUP_ADMIN_INVITE_USERS
  69. | GROUP_ADMIN
  70. )
  71. DEFAULT_PERMISSIONS = OWNER | SUDO
  72. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  73. ALL = (1 << 13) - 1
  74. def owner(func: callable) -> callable:
  75. return _sec(func, OWNER)
  76. def sudo(func: callable) -> callable:
  77. return _sec(func, SUDO)
  78. def support(func: callable) -> callable:
  79. return _sec(func, SUDO | SUPPORT)
  80. def group_owner(func: callable) -> callable:
  81. return _sec(func, SUDO | GROUP_OWNER)
  82. def group_admin_add_admins(func: callable) -> callable:
  83. return _sec(func, SUDO | GROUP_ADMIN_ADD_ADMINS)
  84. def group_admin_change_info(func: callable) -> callable:
  85. return _sec(func, SUDO | GROUP_ADMIN_CHANGE_INFO)
  86. def group_admin_ban_users(func: callable) -> callable:
  87. return _sec(func, SUDO | GROUP_ADMIN_BAN_USERS)
  88. def group_admin_delete_messages(func: callable) -> callable:
  89. return _sec(func, SUDO | GROUP_ADMIN_DELETE_MESSAGES)
  90. def group_admin_pin_messages(func: callable) -> callable:
  91. return _sec(func, SUDO | GROUP_ADMIN_PIN_MESSAGES)
  92. def group_admin_invite_users(func: callable) -> callable:
  93. return _sec(func, SUDO | GROUP_ADMIN_INVITE_USERS)
  94. def group_admin(func: callable) -> callable:
  95. return _sec(func, SUDO | GROUP_ADMIN)
  96. def group_member(func: callable) -> callable:
  97. return _sec(func, SUDO | GROUP_MEMBER)
  98. def pm(func: callable) -> callable:
  99. return _sec(func, SUDO | PM)
  100. def unrestricted(func: callable) -> callable:
  101. return _sec(func, ALL)
  102. def inline_everyone(func: callable) -> callable:
  103. return _sec(func, EVERYONE)
  104. def _sec(func: callable, flags: int) -> callable:
  105. prev = getattr(func, "security", 0)
  106. func.security = prev | OWNER | flags
  107. return func
  108. class SecurityManager:
  109. """Manages command execution security policy"""
  110. def __init__(self, client: CustomTelegramClient, db: Database):
  111. self._client = client
  112. self._db = db
  113. self._cache = {}
  114. self._any_admin = db.get(__name__, "any_admin", False)
  115. self._default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  116. self._tsec_chat = db.pointer(__name__, "tsec_chat", [])
  117. self._tsec_user = db.pointer(__name__, "tsec_user", [])
  118. self._owner = db.pointer(__name__, "owner", [])
  119. self._sudo = db.pointer(__name__, "sudo", [])
  120. self._support = db.pointer(__name__, "support", [])
  121. self._reload_rights()
  122. self.any_admin = self._any_admin
  123. self.default = self._default
  124. self.tsec_chat = self._tsec_chat
  125. self.tsec_user = self._tsec_user
  126. self.owner = self._owner
  127. self.sudo = self._sudo
  128. self.support = self._support
  129. def _reload_rights(self):
  130. """
  131. Internal method to ensure that account owner is always in the owner list
  132. and to clear out outdated tsec rules
  133. """
  134. if self._client.tg_id not in self._owner:
  135. self._owner.append(self._client.tg_id)
  136. for info in self._tsec_user.copy():
  137. if info["expires"] and info["expires"] < time.time():
  138. self._tsec_user.remove(info)
  139. for info in self._tsec_chat.copy():
  140. if info["expires"] and info["expires"] < time.time():
  141. self._tsec_chat.remove(info)
  142. def add_rule(
  143. self,
  144. target_type: str,
  145. target: EntityLike,
  146. rule: str,
  147. duration: int,
  148. ):
  149. """
  150. Adds a targeted security rule
  151. :param target_type: "user" or "chat"
  152. :param target: target entity
  153. :param rule: rule name
  154. :param duration: rule duration in seconds
  155. :return: None
  156. """
  157. if target_type not in {"chat", "user"}:
  158. raise ValueError(f"Invalid target_type: {target_type}")
  159. if not rule.startswith("command") and not rule.startswith("module"):
  160. raise ValueError(f"Invalid rule: {rule}")
  161. if duration < 0:
  162. raise ValueError(f"Invalid duration: {duration}")
  163. (self._tsec_chat if target_type == "chat" else self._tsec_user).append(
  164. {
  165. "target": target.id,
  166. "rule_type": rule.split("/")[0],
  167. "rule": rule.split("/", maxsplit=1)[1],
  168. "expires": int(time.time() + duration) if duration else 0,
  169. "entity_name": get_display_name(target),
  170. "entity_url": utils.get_entity_url(target),
  171. }
  172. )
  173. def remove_rules(self, target_type: str, target_id: int) -> bool:
  174. """
  175. Removes all targeted security rules for the given target
  176. :param target_type: "user" or "chat"
  177. :param target_id: target entity ID
  178. :return: True if any rules were removed
  179. """
  180. any_ = False
  181. if target_type == "user":
  182. for rule in self.tsec_user.copy():
  183. if rule["target"] == target_id:
  184. self.tsec_user.remove(rule)
  185. any_ = True
  186. elif target_type == "chat":
  187. for rule in self.tsec_chat.copy():
  188. if rule["target"] == target_id:
  189. self.tsec_chat.remove(rule)
  190. any_ = True
  191. return any_
  192. def get_flags(self, func: typing.Union[callable, int]) -> int:
  193. """
  194. Gets the security flags for the given function
  195. :param func: function or flags
  196. :return: security flags
  197. """
  198. if isinstance(func, int):
  199. config = func
  200. else:
  201. # Return masks there so user don't need to reboot
  202. # every time he changes permissions. It doesn't
  203. # decrease security at all, bc user anyway can
  204. # access this attribute
  205. config = self._db.get(__name__, "masks", {}).get(
  206. f"{func.__module__}.{func.__name__}",
  207. getattr(func, "security", self._default),
  208. )
  209. if config & ~ALL and not config & EVERYONE:
  210. logger.error("Security config contains unknown bits")
  211. return False
  212. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  213. async def check(
  214. self,
  215. message: typing.Optional[Message],
  216. func: typing.Union[callable, int],
  217. user_id: typing.Optional[int] = None,
  218. ) -> bool:
  219. """
  220. Checks if message sender is permitted to execute certain function
  221. :param message: Message to check or None if you manually pass user_id
  222. :param func: function or flags
  223. :param user_id: user ID
  224. :return: True if permitted, False otherwise
  225. """
  226. self._reload_rights()
  227. if not (config := self.get_flags(func)):
  228. return False
  229. if not user_id:
  230. user_id = message.sender_id
  231. if user_id == self._client.tg_id or getattr(message, "out", False):
  232. return True
  233. logger.debug("Checking security match for %s", config)
  234. f_owner = config & OWNER
  235. f_sudo = config & SUDO
  236. f_support = config & SUPPORT
  237. f_group_owner = config & GROUP_OWNER
  238. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  239. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  240. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  241. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  242. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  243. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  244. f_group_admin = config & GROUP_ADMIN
  245. f_group_member = config & GROUP_MEMBER
  246. f_pm = config & PM
  247. f_group_admin_any = (
  248. f_group_admin_add_admins
  249. or f_group_admin_change_info
  250. or f_group_admin_ban_users
  251. or f_group_admin_delete_messages
  252. or f_group_admin_pin_messages
  253. or f_group_admin_invite_users
  254. or f_group_admin
  255. )
  256. if (
  257. f_owner
  258. and user_id in self._owner
  259. or f_sudo
  260. and user_id in self._sudo
  261. or f_support
  262. and user_id in self._support
  263. ):
  264. return True
  265. if user_id in self._db.get(main.__name__, "blacklist_users", []):
  266. return False
  267. if message is None: # In case of checking inline query security map
  268. return bool(config & EVERYONE)
  269. try:
  270. chat = utils.get_chat_id(message)
  271. except Exception:
  272. chat = None
  273. try:
  274. cmd = message.raw_text[1:].split()[0].strip()
  275. except Exception:
  276. cmd = None
  277. if callable(func):
  278. for info in self._tsec_user.copy():
  279. if info["target"] == user_id:
  280. if info["rule_type"] == "command" and info["rule"] == cmd:
  281. logger.debug("tsec match for user %s", cmd)
  282. return True
  283. if (
  284. info["rule_type"] == "module"
  285. and info["rule"] == func.__self__.__class__.__name__
  286. ):
  287. logger.debug(
  288. "tsec match for user %s",
  289. func.__self__.__class__.__name__,
  290. )
  291. return True
  292. if chat:
  293. for info in self._tsec_chat.copy():
  294. if info["target"] == chat:
  295. if info["rule_type"] == "command" and info["rule"] == cmd:
  296. logger.debug("tsec match for %s", cmd)
  297. return True
  298. if (
  299. info["rule_type"] == "module"
  300. and info["rule"] == func.__self__.__class__.__name__
  301. ):
  302. logger.debug(
  303. "tsec match for %s",
  304. func.__self__.__class__.__name__,
  305. )
  306. return True
  307. if f_group_member and message.is_group or f_pm and message.is_private:
  308. return True
  309. if message.is_channel:
  310. if not message.is_group:
  311. if message.edit_date:
  312. return False
  313. chat_id = utils.get_chat_id(message)
  314. if (
  315. chat_id in self._cache
  316. and self._cache[chat_id]["exp"] >= time.time()
  317. ):
  318. chat = self._cache[chat_id]["chat"]
  319. else:
  320. chat = await message.get_chat()
  321. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  322. if (
  323. not chat.creator
  324. and not chat.admin_rights
  325. or not chat.creator
  326. and not chat.admin_rights.post_messages
  327. ):
  328. return False
  329. if self._any_admin and f_group_admin_any or f_group_admin:
  330. return True
  331. elif f_group_admin_any or f_group_owner:
  332. chat_id = utils.get_chat_id(message)
  333. cache_obj = f"{chat_id}/{user_id}"
  334. if (
  335. cache_obj in self._cache
  336. and self._cache[cache_obj]["exp"] >= time.time()
  337. ):
  338. participant = self._cache[cache_obj]["user"]
  339. else:
  340. participant = await message.client.get_permissions(
  341. message.peer_id,
  342. user_id,
  343. )
  344. self._cache[cache_obj] = {
  345. "user": participant,
  346. "exp": time.time() + 5 * 60,
  347. }
  348. if (
  349. participant.is_creator
  350. or participant.is_admin
  351. and (
  352. self._any_admin
  353. and f_group_admin_any
  354. or f_group_admin
  355. or f_group_admin_add_admins
  356. and participant.add_admins
  357. or f_group_admin_change_info
  358. and participant.change_info
  359. or f_group_admin_ban_users
  360. and participant.ban_users
  361. or f_group_admin_delete_messages
  362. and participant.delete_messages
  363. or f_group_admin_pin_messages
  364. and participant.pin_messages
  365. or f_group_admin_invite_users
  366. and participant.invite_users
  367. )
  368. ):
  369. return True
  370. return False
  371. if message.is_group and (f_group_admin_any or f_group_owner):
  372. chat_id = utils.get_chat_id(message)
  373. cache_obj = f"{chat_id}/{user_id}"
  374. if (
  375. cache_obj in self._cache
  376. and self._cache[cache_obj]["exp"] >= time.time()
  377. ):
  378. participant = self._cache[cache_obj]["user"]
  379. else:
  380. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  381. participants = full_chat.full_chat.participants.participants
  382. participant = next(
  383. (
  384. possible_participant
  385. for possible_participant in participants
  386. if possible_participant.user_id == message.sender_id
  387. ),
  388. None,
  389. )
  390. self._cache[cache_obj] = {
  391. "user": participant,
  392. "exp": time.time() + 5 * 60,
  393. }
  394. if not participant:
  395. return
  396. if (
  397. isinstance(participant, ChatParticipantCreator)
  398. or isinstance(participant, ChatParticipantAdmin)
  399. and f_group_admin_any
  400. ):
  401. return True
  402. return False
  403. _check = check # Legacy