security.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2023
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import logging
  20. import time
  21. import typing
  22. from hikkatl.hints import EntityLike
  23. from hikkatl.tl.functions.messages import GetFullChatRequest
  24. from hikkatl.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  25. from hikkatl.utils import get_display_name
  26. from . import main, utils
  27. from .database import Database
  28. from .tl_cache import CustomTelegramClient
  29. from .types import Command
  30. logger = logging.getLogger(__name__)
  31. OWNER = 1 << 0
  32. SUDO = 1 << 1
  33. SUPPORT = 1 << 2
  34. GROUP_OWNER = 1 << 3
  35. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  36. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  37. GROUP_ADMIN_BAN_USERS = 1 << 6
  38. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  39. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  40. GROUP_ADMIN_INVITE_USERS = 1 << 9
  41. GROUP_ADMIN = 1 << 10
  42. GROUP_MEMBER = 1 << 11
  43. PM = 1 << 12
  44. EVERYONE = 1 << 13
  45. BITMAP = {
  46. "OWNER": OWNER,
  47. "GROUP_OWNER": GROUP_OWNER,
  48. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  49. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  50. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  51. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  52. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  53. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  54. "GROUP_ADMIN": GROUP_ADMIN,
  55. "GROUP_MEMBER": GROUP_MEMBER,
  56. "PM": PM,
  57. "EVERYONE": EVERYONE,
  58. }
  59. GROUP_ADMIN_ANY = (
  60. GROUP_ADMIN_ADD_ADMINS
  61. | GROUP_ADMIN_CHANGE_INFO
  62. | GROUP_ADMIN_BAN_USERS
  63. | GROUP_ADMIN_DELETE_MESSAGES
  64. | GROUP_ADMIN_PIN_MESSAGES
  65. | GROUP_ADMIN_INVITE_USERS
  66. | GROUP_ADMIN
  67. )
  68. DEFAULT_PERMISSIONS = OWNER
  69. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  70. ALL = (1 << 13) - 1
  71. class SecurityGroup(typing.NamedTuple):
  72. """Represents a security group"""
  73. name: str
  74. users: typing.List[int]
  75. permissions: typing.List[dict]
  76. def owner(func: Command) -> Command:
  77. return _sec(func, OWNER)
  78. def _deprecated(name: str) -> callable:
  79. def decorator(func: Command) -> Command:
  80. logger.debug("Using deprecated decorator `%s`, which will have no effect", name)
  81. return func
  82. return decorator
  83. sudo = _deprecated("sudo")
  84. support = _deprecated("support")
  85. def group_owner(func: Command) -> Command:
  86. return _sec(func, GROUP_OWNER)
  87. def group_admin_add_admins(func: Command) -> Command:
  88. return _sec(func, GROUP_ADMIN_ADD_ADMINS)
  89. def group_admin_change_info(func: Command) -> Command:
  90. return _sec(func, GROUP_ADMIN_CHANGE_INFO)
  91. def group_admin_ban_users(func: Command) -> Command:
  92. return _sec(func, GROUP_ADMIN_BAN_USERS)
  93. def group_admin_delete_messages(func: Command) -> Command:
  94. return _sec(func, GROUP_ADMIN_DELETE_MESSAGES)
  95. def group_admin_pin_messages(func: Command) -> Command:
  96. return _sec(func, GROUP_ADMIN_PIN_MESSAGES)
  97. def group_admin_invite_users(func: Command) -> Command:
  98. return _sec(func, GROUP_ADMIN_INVITE_USERS)
  99. def group_admin(func: Command) -> Command:
  100. return _sec(func, GROUP_ADMIN)
  101. def group_member(func: Command) -> Command:
  102. return _sec(func, GROUP_MEMBER)
  103. def pm(func: Command) -> Command:
  104. return _sec(func, PM)
  105. def unrestricted(func: Command) -> Command:
  106. return _sec(func, ALL)
  107. def inline_everyone(func: Command) -> Command:
  108. return _sec(func, EVERYONE)
  109. def _sec(func: Command, flags: int) -> Command:
  110. prev = getattr(func, "security", 0)
  111. func.security = prev | OWNER | flags
  112. return func
  113. class SecurityManager:
  114. """Manages command execution security policy"""
  115. def __init__(self, client: CustomTelegramClient, db: Database):
  116. self._client = client
  117. self._db = db
  118. self._cache: typing.Dict[int, dict] = {}
  119. self._last_warning: int = 0
  120. self._sgroups: typing.Dict[str, SecurityGroup] = {}
  121. self._any_admin = self.any_admin = db.get(__name__, "any_admin", False)
  122. self._default = self.default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  123. self._tsec_chat = self.tsec_chat = db.pointer(__name__, "tsec_chat", [])
  124. self._tsec_user = self.tsec_user = db.pointer(__name__, "tsec_user", [])
  125. self._owner = self.owner = db.pointer(__name__, "owner", [])
  126. self._reload_rights()
  127. def apply_sgroups(self, sgroups: typing.Dict[str, SecurityGroup]):
  128. """Apply security groups"""
  129. self._sgroups = sgroups
  130. def _reload_rights(self):
  131. """
  132. Internal method to ensure that account owner is always in the owner list
  133. and to clear out outdated tsec rules
  134. """
  135. if self._client.tg_id not in self._owner:
  136. self._owner.append(self._client.tg_id)
  137. for info in self._tsec_user.copy():
  138. if info["expires"] and info["expires"] < time.time():
  139. self._tsec_user.remove(info)
  140. for info in self._tsec_chat.copy():
  141. if info["expires"] and info["expires"] < time.time():
  142. self._tsec_chat.remove(info)
  143. def add_rule(
  144. self,
  145. target_type: str,
  146. target: EntityLike,
  147. rule: str,
  148. duration: int,
  149. ):
  150. """
  151. Adds a targeted security rule
  152. :param target_type: "user" or "chat"
  153. :param target: target entity
  154. :param rule: rule name
  155. :param duration: rule duration in seconds
  156. :return: None
  157. """
  158. if target_type not in {"chat", "user"}:
  159. raise ValueError(f"Invalid target_type: {target_type}")
  160. if all(
  161. not rule.startswith(rule_type)
  162. for rule_type in {"command", "module", "inline"}
  163. ):
  164. raise ValueError(f"Invalid rule: {rule}")
  165. if duration < 0:
  166. raise ValueError(f"Invalid duration: {duration}")
  167. (self._tsec_chat if target_type == "chat" else self._tsec_user).append({
  168. "target": target.id,
  169. "rule_type": rule.split("/")[0],
  170. "rule": rule.split("/", maxsplit=1)[1],
  171. "expires": int(time.time() + duration) if duration else 0,
  172. "entity_name": get_display_name(target),
  173. "entity_url": utils.get_entity_url(target),
  174. })
  175. def remove_rules(self, target_type: str, target_id: int) -> bool:
  176. """
  177. Removes all targeted security rules for the given target
  178. :param target_type: "user" or "chat"
  179. :param target_id: target entity ID
  180. :return: True if any rules were removed
  181. """
  182. any_ = False
  183. if target_type == "user":
  184. for rule in self.tsec_user.copy():
  185. if rule["target"] == target_id:
  186. self.tsec_user.remove(rule)
  187. any_ = True
  188. elif target_type == "chat":
  189. for rule in self.tsec_chat.copy():
  190. if rule["target"] == target_id:
  191. self.tsec_chat.remove(rule)
  192. any_ = True
  193. return any_
  194. def remove_rule(self, target_type: str, target_id: int, rule_cont: str) -> bool:
  195. """
  196. Removes targeted security rules for the given target
  197. :param target_type: "user" or "chat"
  198. :param target_id: target entity ID
  199. :param rule_cont: rule name (module or command)
  200. :return: True if any rules were removed
  201. """
  202. any_ = False
  203. if target_type == "user":
  204. for rule in self.tsec_user.copy():
  205. if rule["target"] == target_id and rule["rule"] == rule_cont:
  206. self.tsec_user.remove(rule)
  207. any_ = True
  208. elif target_type == "chat":
  209. for rule in self.tsec_chat.copy():
  210. if rule["target"] == target_id and rule["rule"] == rule_cont:
  211. self.tsec_chat.remove(rule)
  212. any_ = True
  213. return any_
  214. def get_flags(self, func: typing.Union[Command, int]) -> int:
  215. """
  216. Gets the security flags for the given function
  217. :param func: function or flags
  218. :return: security flags
  219. """
  220. if isinstance(func, int):
  221. config = func
  222. else:
  223. # Return masks there so user don't need to reboot
  224. # every time he changes permissions. It doesn't
  225. # decrease security at all, bc user anyway can
  226. # access this attribute
  227. config = self._db.get(__name__, "masks", {}).get(
  228. f"{func.__module__}.{func.__name__}",
  229. getattr(func, "security", self._default),
  230. )
  231. if config & ~ALL and not config & EVERYONE:
  232. logger.error("Security config contains unknown bits")
  233. return False
  234. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  235. def _check_tsec_inline(self, user_id: int, command: str) -> bool:
  236. """
  237. Checks if user is permitted to execute certain inline command
  238. :param user_id: user ID
  239. :param command: command name
  240. :return: True if permitted, False otherwise
  241. """
  242. return command and any(
  243. (
  244. rule["target"] == user_id
  245. and rule["rule_type"] == "inline"
  246. and rule["rule"] == command
  247. )
  248. for rule in self._tsec_user
  249. )
  250. def check_tsec(self, user_id: int, command: str) -> bool:
  251. for info in self._sgroups.copy().values():
  252. if user_id in info.users:
  253. for permission in info.permissions:
  254. if (
  255. permission["rule_type"] == "command"
  256. and permission["rule"] == command
  257. or permission["rule_type"] == "module"
  258. and permission["rule"] == command
  259. ):
  260. return True
  261. for info in self._tsec_user.copy():
  262. if info["target"] == user_id and (
  263. info["rule_type"] == "command"
  264. and info["rule"] == command
  265. or info["rule_type"] == "module"
  266. and command in self._client.loader.commands
  267. and info["rule"]
  268. == self._client.loader.commands[command].__qualname__.split(".")[0]
  269. ):
  270. return True
  271. return False
  272. async def check(
  273. self,
  274. message: typing.Optional[Message],
  275. func: typing.Union[Command, int],
  276. user_id: typing.Optional[int] = None,
  277. inline_cmd: typing.Optional[str] = None,
  278. *,
  279. usernames: typing.Optional[typing.List[str]] = None,
  280. ) -> bool:
  281. """
  282. Checks if message sender is permitted to execute certain function
  283. :param message: Message to check or None if you manually pass user_id
  284. :param func: function or flags
  285. :param user_id: user ID
  286. :param inline_cmd: Inline command name if it's inline query
  287. :return: True if permitted, False otherwise
  288. """
  289. self._reload_rights()
  290. if not (config := self.get_flags(func)):
  291. return False
  292. if not user_id:
  293. user_id = message.sender_id
  294. is_channel = False
  295. if (
  296. message
  297. and message.is_channel
  298. and not message.is_group
  299. and message.edit_date
  300. ):
  301. async for event in self._client.iter_admin_log(
  302. utils.get_chat_id(message),
  303. limit=10,
  304. edit=True,
  305. ):
  306. if event.action.prev_message.id == message.id:
  307. user_id = event.user_id
  308. is_channel = True
  309. if (
  310. user_id == self._client.tg_id
  311. or getattr(message, "out", False)
  312. and not is_channel
  313. ):
  314. return True
  315. logger.debug("Checking security match for %s", config)
  316. if config & SUDO or config & SUPPORT:
  317. if not self._last_warning or time.time() - self._last_warning > 60 * 60:
  318. import warnings
  319. warnings.warn(
  320. (
  321. "You are using module containing SUDO or SUPPORT security"
  322. " groups, which are deprecated. It might behave strangely"
  323. ),
  324. DeprecationWarning,
  325. )
  326. self._last_warning = time.time()
  327. f_group_owner = config & GROUP_OWNER
  328. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  329. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  330. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  331. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  332. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  333. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  334. f_group_admin = config & GROUP_ADMIN
  335. f_group_member = config & GROUP_MEMBER
  336. f_pm = config & PM
  337. f_group_admin_any = (
  338. f_group_admin_add_admins
  339. or f_group_admin_change_info
  340. or f_group_admin_ban_users
  341. or f_group_admin_delete_messages
  342. or f_group_admin_pin_messages
  343. or f_group_admin_invite_users
  344. or f_group_admin
  345. )
  346. if user_id in self._owner:
  347. return True
  348. if user_id in self._db.get(main.__name__, "blacklist_users", []):
  349. return False
  350. if message is None: # In case of checking inline query security map
  351. return self._check_tsec_inline(user_id, inline_cmd) or bool(
  352. config & EVERYONE
  353. )
  354. try:
  355. chat = utils.get_chat_id(message)
  356. except Exception:
  357. chat = None
  358. try:
  359. cmd = message.raw_text[1:].split()[0].strip()
  360. if usernames:
  361. for username in usernames:
  362. cmd = cmd.replace(f"@{username}", "")
  363. except Exception:
  364. cmd = None
  365. if callable(func):
  366. command = self._client.loader.find_alias(cmd, include_legacy=True) or cmd
  367. for info in self._sgroups.copy().values():
  368. if user_id in info.users:
  369. for permission in info.permissions:
  370. if (
  371. permission["rule_type"] == "command"
  372. and permission["rule"] == command
  373. ):
  374. logger.debug("sgroup match for %s", command)
  375. return True
  376. if (
  377. permission["rule_type"] == "module"
  378. and permission["rule"] == func.__self__.__class__.__name__
  379. ):
  380. logger.debug(
  381. "sgroup match for %s", func.__self__.__class__.__name__
  382. )
  383. return True
  384. for info in self._tsec_user.copy():
  385. if info["target"] == user_id:
  386. if info["rule_type"] == "command" and info["rule"] == command:
  387. logger.debug("tsec match for user %s", command)
  388. return True
  389. if (
  390. info["rule_type"] == "module"
  391. and info["rule"] == func.__self__.__class__.__name__
  392. ):
  393. logger.debug(
  394. "tsec match for user %s",
  395. func.__self__.__class__.__name__,
  396. )
  397. return True
  398. if chat:
  399. for info in self._tsec_chat.copy():
  400. if info["target"] == chat:
  401. if info["rule_type"] == "command" and info["rule"] == command:
  402. logger.debug("tsec match for %s", command)
  403. return True
  404. if (
  405. info["rule_type"] == "module"
  406. and info["rule"] == func.__self__.__class__.__name__
  407. ):
  408. logger.debug(
  409. "tsec match for %s",
  410. func.__self__.__class__.__name__,
  411. )
  412. return True
  413. if f_group_member and message.is_group or f_pm and message.is_private:
  414. return True
  415. if message.is_channel:
  416. if not message.is_group:
  417. chat_id = utils.get_chat_id(message)
  418. if (
  419. chat_id in self._cache
  420. and self._cache[chat_id]["exp"] >= time.time()
  421. ):
  422. chat = self._cache[chat_id]["chat"]
  423. else:
  424. chat = await message.get_chat()
  425. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  426. if (
  427. not chat.creator
  428. and not chat.admin_rights
  429. or not chat.creator
  430. and not chat.admin_rights.post_messages
  431. ):
  432. return False
  433. if self._any_admin and f_group_admin_any or f_group_admin:
  434. return True
  435. elif f_group_admin_any or f_group_owner:
  436. chat_id = utils.get_chat_id(message)
  437. cache_obj = f"{chat_id}/{user_id}"
  438. if (
  439. cache_obj in self._cache
  440. and self._cache[cache_obj]["exp"] >= time.time()
  441. ):
  442. participant = self._cache[cache_obj]["user"]
  443. else:
  444. participant = await message.client.get_permissions(
  445. message.peer_id,
  446. user_id,
  447. )
  448. self._cache[cache_obj] = {
  449. "user": participant,
  450. "exp": time.time() + 5 * 60,
  451. }
  452. if (
  453. participant.is_creator
  454. or participant.is_admin
  455. and (
  456. self._any_admin
  457. and f_group_admin_any
  458. or f_group_admin
  459. or f_group_admin_add_admins
  460. and participant.add_admins
  461. or f_group_admin_change_info
  462. and participant.change_info
  463. or f_group_admin_ban_users
  464. and participant.ban_users
  465. or f_group_admin_delete_messages
  466. and participant.delete_messages
  467. or f_group_admin_pin_messages
  468. and participant.pin_messages
  469. or f_group_admin_invite_users
  470. and participant.invite_users
  471. )
  472. ):
  473. return True
  474. return False
  475. if message.is_group and (f_group_admin_any or f_group_owner):
  476. chat_id = utils.get_chat_id(message)
  477. cache_obj = f"{chat_id}/{user_id}"
  478. if (
  479. cache_obj in self._cache
  480. and self._cache[cache_obj]["exp"] >= time.time()
  481. ):
  482. participant = self._cache[cache_obj]["user"]
  483. else:
  484. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  485. participants = full_chat.full_chat.participants.participants
  486. participant = next(
  487. (
  488. possible_participant
  489. for possible_participant in participants
  490. if possible_participant.user_id == message.sender_id
  491. ),
  492. None,
  493. )
  494. self._cache[cache_obj] = {
  495. "user": participant,
  496. "exp": time.time() + 5 * 60,
  497. }
  498. if not participant:
  499. return
  500. if (
  501. isinstance(participant, ChatParticipantCreator)
  502. or isinstance(participant, ChatParticipantAdmin)
  503. and f_group_admin_any
  504. ):
  505. return True
  506. return False
  507. _check = check # Legacy