security.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2022
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import logging
  20. import time
  21. import typing
  22. from telethon.hints import EntityLike
  23. from telethon.tl.functions.messages import GetFullChatRequest
  24. from telethon.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  25. from telethon.utils import get_display_name
  26. from . import main, utils
  27. from .database import Database
  28. from .tl_cache import CustomTelegramClient
  29. from .types import Command
  30. logger = logging.getLogger(__name__)
  31. OWNER = 1 << 0
  32. SUDO = 1 << 1
  33. SUPPORT = 1 << 2
  34. GROUP_OWNER = 1 << 3
  35. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  36. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  37. GROUP_ADMIN_BAN_USERS = 1 << 6
  38. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  39. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  40. GROUP_ADMIN_INVITE_USERS = 1 << 9
  41. GROUP_ADMIN = 1 << 10
  42. GROUP_MEMBER = 1 << 11
  43. PM = 1 << 12
  44. EVERYONE = 1 << 13
  45. BITMAP = {
  46. "OWNER": OWNER,
  47. "SUDO": SUDO,
  48. "SUPPORT": SUPPORT,
  49. "GROUP_OWNER": GROUP_OWNER,
  50. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  51. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  52. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  53. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  54. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  55. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  56. "GROUP_ADMIN": GROUP_ADMIN,
  57. "GROUP_MEMBER": GROUP_MEMBER,
  58. "PM": PM,
  59. "EVERYONE": EVERYONE,
  60. }
  61. GROUP_ADMIN_ANY = (
  62. GROUP_ADMIN_ADD_ADMINS
  63. | GROUP_ADMIN_CHANGE_INFO
  64. | GROUP_ADMIN_BAN_USERS
  65. | GROUP_ADMIN_DELETE_MESSAGES
  66. | GROUP_ADMIN_PIN_MESSAGES
  67. | GROUP_ADMIN_INVITE_USERS
  68. | GROUP_ADMIN
  69. )
  70. DEFAULT_PERMISSIONS = OWNER | SUDO
  71. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  72. ALL = (1 << 13) - 1
  73. def owner(func: Command) -> Command:
  74. return _sec(func, OWNER)
  75. def sudo(func: Command) -> Command:
  76. return _sec(func, SUDO)
  77. def support(func: Command) -> Command:
  78. return _sec(func, SUDO | SUPPORT)
  79. def group_owner(func: Command) -> Command:
  80. return _sec(func, SUDO | GROUP_OWNER)
  81. def group_admin_add_admins(func: Command) -> Command:
  82. return _sec(func, SUDO | GROUP_ADMIN_ADD_ADMINS)
  83. def group_admin_change_info(func: Command) -> Command:
  84. return _sec(func, SUDO | GROUP_ADMIN_CHANGE_INFO)
  85. def group_admin_ban_users(func: Command) -> Command:
  86. return _sec(func, SUDO | GROUP_ADMIN_BAN_USERS)
  87. def group_admin_delete_messages(func: Command) -> Command:
  88. return _sec(func, SUDO | GROUP_ADMIN_DELETE_MESSAGES)
  89. def group_admin_pin_messages(func: Command) -> Command:
  90. return _sec(func, SUDO | GROUP_ADMIN_PIN_MESSAGES)
  91. def group_admin_invite_users(func: Command) -> Command:
  92. return _sec(func, SUDO | GROUP_ADMIN_INVITE_USERS)
  93. def group_admin(func: Command) -> Command:
  94. return _sec(func, SUDO | GROUP_ADMIN)
  95. def group_member(func: Command) -> Command:
  96. return _sec(func, SUDO | GROUP_MEMBER)
  97. def pm(func: Command) -> Command:
  98. return _sec(func, SUDO | PM)
  99. def unrestricted(func: Command) -> Command:
  100. return _sec(func, ALL)
  101. def inline_everyone(func: Command) -> Command:
  102. return _sec(func, EVERYONE)
  103. def _sec(func: Command, flags: int) -> Command:
  104. prev = getattr(func, "security", 0)
  105. func.security = prev | OWNER | flags
  106. return func
  107. class SecurityManager:
  108. """Manages command execution security policy"""
  109. def __init__(self, client: CustomTelegramClient, db: Database):
  110. self._client = client
  111. self._db = db
  112. self._cache = {}
  113. self._any_admin = db.get(__name__, "any_admin", False)
  114. self._default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  115. self._tsec_chat = db.pointer(__name__, "tsec_chat", [])
  116. self._tsec_user = db.pointer(__name__, "tsec_user", [])
  117. self._owner = db.pointer(__name__, "owner", [])
  118. self._sudo = db.pointer(__name__, "sudo", [])
  119. self._support = db.pointer(__name__, "support", [])
  120. self._reload_rights()
  121. self.any_admin = self._any_admin
  122. self.default = self._default
  123. self.tsec_chat = self._tsec_chat
  124. self.tsec_user = self._tsec_user
  125. self.owner = self._owner
  126. self.sudo = self._sudo
  127. self.support = self._support
  128. def _reload_rights(self):
  129. """
  130. Internal method to ensure that account owner is always in the owner list
  131. and to clear out outdated tsec rules
  132. """
  133. if self._client.tg_id not in self._owner:
  134. self._owner.append(self._client.tg_id)
  135. for info in self._tsec_user.copy():
  136. if info["expires"] and info["expires"] < time.time():
  137. self._tsec_user.remove(info)
  138. for info in self._tsec_chat.copy():
  139. if info["expires"] and info["expires"] < time.time():
  140. self._tsec_chat.remove(info)
  141. def add_rule(
  142. self,
  143. target_type: str,
  144. target: EntityLike,
  145. rule: str,
  146. duration: int,
  147. ):
  148. """
  149. Adds a targeted security rule
  150. :param target_type: "user" or "chat"
  151. :param target: target entity
  152. :param rule: rule name
  153. :param duration: rule duration in seconds
  154. :return: None
  155. """
  156. if target_type not in {"chat", "user"}:
  157. raise ValueError(f"Invalid target_type: {target_type}")
  158. if not rule.startswith("command") and not rule.startswith("module"):
  159. raise ValueError(f"Invalid rule: {rule}")
  160. if duration < 0:
  161. raise ValueError(f"Invalid duration: {duration}")
  162. (self._tsec_chat if target_type == "chat" else self._tsec_user).append(
  163. {
  164. "target": target.id,
  165. "rule_type": rule.split("/")[0],
  166. "rule": rule.split("/", maxsplit=1)[1],
  167. "expires": int(time.time() + duration) if duration else 0,
  168. "entity_name": get_display_name(target),
  169. "entity_url": utils.get_entity_url(target),
  170. }
  171. )
  172. def remove_rules(self, target_type: str, target_id: int) -> bool:
  173. """
  174. Removes all targeted security rules for the given target
  175. :param target_type: "user" or "chat"
  176. :param target_id: target entity ID
  177. :return: True if any rules were removed
  178. """
  179. any_ = False
  180. if target_type == "user":
  181. for rule in self.tsec_user.copy():
  182. if rule["target"] == target_id:
  183. self.tsec_user.remove(rule)
  184. any_ = True
  185. elif target_type == "chat":
  186. for rule in self.tsec_chat.copy():
  187. if rule["target"] == target_id:
  188. self.tsec_chat.remove(rule)
  189. any_ = True
  190. return any_
  191. def get_flags(self, func: typing.Union[Command, int]) -> int:
  192. """
  193. Gets the security flags for the given function
  194. :param func: function or flags
  195. :return: security flags
  196. """
  197. if isinstance(func, int):
  198. config = func
  199. else:
  200. # Return masks there so user don't need to reboot
  201. # every time he changes permissions. It doesn't
  202. # decrease security at all, bc user anyway can
  203. # access this attribute
  204. config = self._db.get(__name__, "masks", {}).get(
  205. f"{func.__module__}.{func.__name__}",
  206. getattr(func, "security", self._default),
  207. )
  208. if config & ~ALL and not config & EVERYONE:
  209. logger.error("Security config contains unknown bits")
  210. return False
  211. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  212. async def check(
  213. self,
  214. message: typing.Optional[Message],
  215. func: typing.Union[Command, int],
  216. user_id: typing.Optional[int] = None,
  217. ) -> bool:
  218. """
  219. Checks if message sender is permitted to execute certain function
  220. :param message: Message to check or None if you manually pass user_id
  221. :param func: function or flags
  222. :param user_id: user ID
  223. :return: True if permitted, False otherwise
  224. """
  225. self._reload_rights()
  226. if not (config := self.get_flags(func)):
  227. return False
  228. if not user_id:
  229. user_id = message.sender_id
  230. if user_id == self._client.tg_id or getattr(message, "out", False):
  231. return True
  232. logger.debug("Checking security match for %s", config)
  233. f_owner = config & OWNER
  234. f_sudo = config & SUDO
  235. f_support = config & SUPPORT
  236. f_group_owner = config & GROUP_OWNER
  237. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  238. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  239. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  240. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  241. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  242. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  243. f_group_admin = config & GROUP_ADMIN
  244. f_group_member = config & GROUP_MEMBER
  245. f_pm = config & PM
  246. f_group_admin_any = (
  247. f_group_admin_add_admins
  248. or f_group_admin_change_info
  249. or f_group_admin_ban_users
  250. or f_group_admin_delete_messages
  251. or f_group_admin_pin_messages
  252. or f_group_admin_invite_users
  253. or f_group_admin
  254. )
  255. if (
  256. f_owner
  257. and user_id in self._owner
  258. or f_sudo
  259. and user_id in self._sudo
  260. or f_support
  261. and user_id in self._support
  262. ):
  263. return True
  264. if user_id in self._db.get(main.__name__, "blacklist_users", []):
  265. return False
  266. if message is None: # In case of checking inline query security map
  267. return bool(config & EVERYONE)
  268. try:
  269. chat = utils.get_chat_id(message)
  270. except Exception:
  271. chat = None
  272. try:
  273. cmd = message.raw_text[1:].split()[0].strip()
  274. except Exception:
  275. cmd = None
  276. if callable(func):
  277. command = self._client.loader.find_alias(cmd, include_legacy=True) or cmd
  278. for info in self._tsec_user.copy():
  279. if info["target"] == user_id:
  280. if info["rule_type"] == "command" and info["rule"] == command:
  281. logger.debug("tsec match for user %s", command)
  282. return True
  283. if (
  284. info["rule_type"] == "module"
  285. and info["rule"] == func.__self__.__class__.__name__
  286. ):
  287. logger.debug(
  288. "tsec match for user %s",
  289. func.__self__.__class__.__name__,
  290. )
  291. return True
  292. if chat:
  293. for info in self._tsec_chat.copy():
  294. if info["target"] == chat:
  295. if info["rule_type"] == "command" and info["rule"] == command:
  296. logger.debug("tsec match for %s", command)
  297. return True
  298. if (
  299. info["rule_type"] == "module"
  300. and info["rule"] == func.__self__.__class__.__name__
  301. ):
  302. logger.debug(
  303. "tsec match for %s",
  304. func.__self__.__class__.__name__,
  305. )
  306. return True
  307. if f_group_member and message.is_group or f_pm and message.is_private:
  308. return True
  309. if message.is_channel:
  310. if not message.is_group:
  311. if message.edit_date:
  312. return False
  313. chat_id = utils.get_chat_id(message)
  314. if (
  315. chat_id in self._cache
  316. and self._cache[chat_id]["exp"] >= time.time()
  317. ):
  318. chat = self._cache[chat_id]["chat"]
  319. else:
  320. chat = await message.get_chat()
  321. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  322. if (
  323. not chat.creator
  324. and not chat.admin_rights
  325. or not chat.creator
  326. and not chat.admin_rights.post_messages
  327. ):
  328. return False
  329. if self._any_admin and f_group_admin_any or f_group_admin:
  330. return True
  331. elif f_group_admin_any or f_group_owner:
  332. chat_id = utils.get_chat_id(message)
  333. cache_obj = f"{chat_id}/{user_id}"
  334. if (
  335. cache_obj in self._cache
  336. and self._cache[cache_obj]["exp"] >= time.time()
  337. ):
  338. participant = self._cache[cache_obj]["user"]
  339. else:
  340. participant = await message.client.get_permissions(
  341. message.peer_id,
  342. user_id,
  343. )
  344. self._cache[cache_obj] = {
  345. "user": participant,
  346. "exp": time.time() + 5 * 60,
  347. }
  348. if (
  349. participant.is_creator
  350. or participant.is_admin
  351. and (
  352. self._any_admin
  353. and f_group_admin_any
  354. or f_group_admin
  355. or f_group_admin_add_admins
  356. and participant.add_admins
  357. or f_group_admin_change_info
  358. and participant.change_info
  359. or f_group_admin_ban_users
  360. and participant.ban_users
  361. or f_group_admin_delete_messages
  362. and participant.delete_messages
  363. or f_group_admin_pin_messages
  364. and participant.pin_messages
  365. or f_group_admin_invite_users
  366. and participant.invite_users
  367. )
  368. ):
  369. return True
  370. return False
  371. if message.is_group and (f_group_admin_any or f_group_owner):
  372. chat_id = utils.get_chat_id(message)
  373. cache_obj = f"{chat_id}/{user_id}"
  374. if (
  375. cache_obj in self._cache
  376. and self._cache[cache_obj]["exp"] >= time.time()
  377. ):
  378. participant = self._cache[cache_obj]["user"]
  379. else:
  380. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  381. participants = full_chat.full_chat.participants.participants
  382. participant = next(
  383. (
  384. possible_participant
  385. for possible_participant in participants
  386. if possible_participant.user_id == message.sender_id
  387. ),
  388. None,
  389. )
  390. self._cache[cache_obj] = {
  391. "user": participant,
  392. "exp": time.time() + 5 * 60,
  393. }
  394. if not participant:
  395. return
  396. if (
  397. isinstance(participant, ChatParticipantCreator)
  398. or isinstance(participant, ChatParticipantAdmin)
  399. and f_group_admin_any
  400. ):
  401. return True
  402. return False
  403. _check = check # Legacy