security.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2023
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import logging
  20. import time
  21. import typing
  22. from hikkatl.hints import EntityLike
  23. from hikkatl.tl.functions.messages import GetFullChatRequest
  24. from hikkatl.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  25. from hikkatl.utils import get_display_name
  26. from . import main, utils
  27. from .database import Database
  28. from .tl_cache import CustomTelegramClient
  29. from .types import Command
  30. logger = logging.getLogger(__name__)
  31. OWNER = 1 << 0
  32. SUDO = 1 << 1
  33. SUPPORT = 1 << 2
  34. GROUP_OWNER = 1 << 3
  35. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  36. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  37. GROUP_ADMIN_BAN_USERS = 1 << 6
  38. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  39. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  40. GROUP_ADMIN_INVITE_USERS = 1 << 9
  41. GROUP_ADMIN = 1 << 10
  42. GROUP_MEMBER = 1 << 11
  43. PM = 1 << 12
  44. EVERYONE = 1 << 13
  45. BITMAP = {
  46. "OWNER": OWNER,
  47. "GROUP_OWNER": GROUP_OWNER,
  48. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  49. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  50. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  51. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  52. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  53. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  54. "GROUP_ADMIN": GROUP_ADMIN,
  55. "GROUP_MEMBER": GROUP_MEMBER,
  56. "PM": PM,
  57. "EVERYONE": EVERYONE,
  58. }
  59. GROUP_ADMIN_ANY = (
  60. GROUP_ADMIN_ADD_ADMINS
  61. | GROUP_ADMIN_CHANGE_INFO
  62. | GROUP_ADMIN_BAN_USERS
  63. | GROUP_ADMIN_DELETE_MESSAGES
  64. | GROUP_ADMIN_PIN_MESSAGES
  65. | GROUP_ADMIN_INVITE_USERS
  66. | GROUP_ADMIN
  67. )
  68. DEFAULT_PERMISSIONS = OWNER
  69. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  70. ALL = (1 << 13) - 1
  71. class SecurityGroup(typing.NamedTuple):
  72. """Represents a security group"""
  73. name: str
  74. users: typing.List[int]
  75. permissions: typing.List[dict]
  76. def owner(func: Command) -> Command:
  77. return _sec(func, OWNER)
  78. def _deprecated(name: str) -> callable:
  79. def decorator(func: Command) -> Command:
  80. logger.debug("Using deprecated decorator `%s`, which will have no effect", name)
  81. return func
  82. return decorator
  83. sudo = _deprecated("sudo")
  84. support = _deprecated("support")
  85. def group_owner(func: Command) -> Command:
  86. return _sec(func, GROUP_OWNER)
  87. def group_admin_add_admins(func: Command) -> Command:
  88. return _sec(func, GROUP_ADMIN_ADD_ADMINS)
  89. def group_admin_change_info(func: Command) -> Command:
  90. return _sec(func, GROUP_ADMIN_CHANGE_INFO)
  91. def group_admin_ban_users(func: Command) -> Command:
  92. return _sec(func, GROUP_ADMIN_BAN_USERS)
  93. def group_admin_delete_messages(func: Command) -> Command:
  94. return _sec(func, GROUP_ADMIN_DELETE_MESSAGES)
  95. def group_admin_pin_messages(func: Command) -> Command:
  96. return _sec(func, GROUP_ADMIN_PIN_MESSAGES)
  97. def group_admin_invite_users(func: Command) -> Command:
  98. return _sec(func, GROUP_ADMIN_INVITE_USERS)
  99. def group_admin(func: Command) -> Command:
  100. return _sec(func, GROUP_ADMIN)
  101. def group_member(func: Command) -> Command:
  102. return _sec(func, GROUP_MEMBER)
  103. def pm(func: Command) -> Command:
  104. return _sec(func, PM)
  105. def unrestricted(func: Command) -> Command:
  106. return _sec(func, ALL)
  107. def inline_everyone(func: Command) -> Command:
  108. return _sec(func, EVERYONE)
  109. def _sec(func: Command, flags: int) -> Command:
  110. prev = getattr(func, "security", 0)
  111. func.security = prev | OWNER | flags
  112. return func
  113. class SecurityManager:
  114. """Manages command execution security policy"""
  115. def __init__(self, client: CustomTelegramClient, db: Database):
  116. self._client = client
  117. self._db = db
  118. self._cache: typing.Dict[int, dict] = {}
  119. self._last_warning: int = 0
  120. self._sgroups: typing.Dict[str, SecurityGroup] = {}
  121. self._any_admin = self.any_admin = db.get(__name__, "any_admin", False)
  122. self._default = self.default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  123. self._tsec_chat = self.tsec_chat = db.pointer(__name__, "tsec_chat", [])
  124. self._tsec_user = self.tsec_user = db.pointer(__name__, "tsec_user", [])
  125. self._owner = self.owner = db.pointer(__name__, "owner", [])
  126. self._reload_rights()
  127. def apply_sgroups(self, sgroups: typing.Dict[str, SecurityGroup]):
  128. """Apply security groups"""
  129. self._sgroups = sgroups
  130. def _reload_rights(self):
  131. """
  132. Internal method to ensure that account owner is always in the owner list
  133. and to clear out outdated tsec rules
  134. """
  135. if self._client.tg_id not in self._owner:
  136. self._owner.append(self._client.tg_id)
  137. for info in self._tsec_user.copy():
  138. if info["expires"] and info["expires"] < time.time():
  139. self._tsec_user.remove(info)
  140. for info in self._tsec_chat.copy():
  141. if info["expires"] and info["expires"] < time.time():
  142. self._tsec_chat.remove(info)
  143. def add_rule(
  144. self,
  145. target_type: str,
  146. target: EntityLike,
  147. rule: str,
  148. duration: int,
  149. ):
  150. """
  151. Adds a targeted security rule
  152. :param target_type: "user" or "chat"
  153. :param target: target entity
  154. :param rule: rule name
  155. :param duration: rule duration in seconds
  156. :return: None
  157. """
  158. if target_type not in {"chat", "user"}:
  159. raise ValueError(f"Invalid target_type: {target_type}")
  160. if all(
  161. not rule.startswith(rule_type)
  162. for rule_type in {"command", "module", "inline"}
  163. ):
  164. raise ValueError(f"Invalid rule: {rule}")
  165. if duration < 0:
  166. raise ValueError(f"Invalid duration: {duration}")
  167. (self._tsec_chat if target_type == "chat" else self._tsec_user).append(
  168. {
  169. "target": target.id,
  170. "rule_type": rule.split("/")[0],
  171. "rule": rule.split("/", maxsplit=1)[1],
  172. "expires": int(time.time() + duration) if duration else 0,
  173. "entity_name": get_display_name(target),
  174. "entity_url": utils.get_entity_url(target),
  175. }
  176. )
  177. def remove_rules(self, target_type: str, target_id: int) -> bool:
  178. """
  179. Removes all targeted security rules for the given target
  180. :param target_type: "user" or "chat"
  181. :param target_id: target entity ID
  182. :return: True if any rules were removed
  183. """
  184. any_ = False
  185. if target_type == "user":
  186. for rule in self.tsec_user.copy():
  187. if rule["target"] == target_id:
  188. self.tsec_user.remove(rule)
  189. any_ = True
  190. elif target_type == "chat":
  191. for rule in self.tsec_chat.copy():
  192. if rule["target"] == target_id:
  193. self.tsec_chat.remove(rule)
  194. any_ = True
  195. return any_
  196. def remove_rule(self, target_type: str, target_id: int, rule_cont: str) -> bool:
  197. """
  198. Removes targeted security rules for the given target
  199. :param target_type: "user" or "chat"
  200. :param target_id: target entity ID
  201. :param rule_cont: rule name (module or command)
  202. :return: True if any rules were removed
  203. """
  204. any_ = False
  205. if target_type == "user":
  206. for rule in self.tsec_user.copy():
  207. if rule["target"] == target_id and rule["rule"] == rule_cont:
  208. self.tsec_user.remove(rule)
  209. any_ = True
  210. elif target_type == "chat":
  211. for rule in self.tsec_chat.copy():
  212. if rule["target"] == target_id and rule["rule"] == rule_cont:
  213. self.tsec_chat.remove(rule)
  214. any_ = True
  215. return any_
  216. def get_flags(self, func: typing.Union[Command, int]) -> int:
  217. """
  218. Gets the security flags for the given function
  219. :param func: function or flags
  220. :return: security flags
  221. """
  222. if isinstance(func, int):
  223. config = func
  224. else:
  225. # Return masks there so user don't need to reboot
  226. # every time he changes permissions. It doesn't
  227. # decrease security at all, bc user anyway can
  228. # access this attribute
  229. config = self._db.get(__name__, "masks", {}).get(
  230. f"{func.__module__}.{func.__name__}",
  231. getattr(func, "security", self._default),
  232. )
  233. if config & ~ALL and not config & EVERYONE:
  234. logger.error("Security config contains unknown bits")
  235. return False
  236. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  237. def _check_tsec_inline(self, user_id: int, command: str) -> bool:
  238. """
  239. Checks if user is permitted to execute certain inline command
  240. :param user_id: user ID
  241. :param command: command name
  242. :return: True if permitted, False otherwise
  243. """
  244. return command and any(
  245. (
  246. rule["target"] == user_id
  247. and rule["rule_type"] == "inline"
  248. and rule["rule"] == command
  249. )
  250. for rule in self._tsec_user
  251. )
  252. def check_tsec(self, user_id: int, command: str) -> bool:
  253. for info in self._sgroups.copy().values():
  254. if user_id in info.users:
  255. for permission in info.permissions:
  256. if (
  257. permission["rule_type"] == "command"
  258. and permission["rule"] == command
  259. or permission["rule_type"] == "module"
  260. and permission["rule"] == command
  261. ):
  262. return True
  263. for info in self._tsec_user.copy():
  264. if info["target"] == user_id and (
  265. info["rule_type"] == "command"
  266. and info["rule"] == command
  267. or info["rule_type"] == "module"
  268. and command in self._client.loader.commands
  269. and info["rule"]
  270. == self._client.loader.commands[command].__qualname__.split(".")[0]
  271. ):
  272. return True
  273. return False
  274. async def check(
  275. self,
  276. message: typing.Optional[Message],
  277. func: typing.Union[Command, int],
  278. user_id: typing.Optional[int] = None,
  279. inline_cmd: typing.Optional[str] = None,
  280. *,
  281. usernames: typing.Optional[typing.List[str]] = None,
  282. ) -> bool:
  283. """
  284. Checks if message sender is permitted to execute certain function
  285. :param message: Message to check or None if you manually pass user_id
  286. :param func: function or flags
  287. :param user_id: user ID
  288. :param inline_cmd: Inline command name if it's inline query
  289. :return: True if permitted, False otherwise
  290. """
  291. self._reload_rights()
  292. if not (config := self.get_flags(func)):
  293. return False
  294. if not user_id:
  295. user_id = message.sender_id
  296. is_channel = False
  297. if (
  298. message
  299. and message.is_channel
  300. and not message.is_group
  301. and message.edit_date
  302. ):
  303. async for event in self._client.iter_admin_log(
  304. utils.get_chat_id(message),
  305. limit=10,
  306. edit=True,
  307. ):
  308. if event.action.prev_message.id == message.id:
  309. user_id = event.user_id
  310. is_channel = True
  311. if (
  312. user_id == self._client.tg_id
  313. or getattr(message, "out", False)
  314. and not is_channel
  315. ):
  316. return True
  317. logger.debug("Checking security match for %s", config)
  318. if config & SUDO or config & SUPPORT:
  319. if not self._last_warning or time.time() - self._last_warning > 60 * 60:
  320. import warnings
  321. warnings.warn(
  322. (
  323. "You are using module containing SUDO or SUPPORT security"
  324. " groups, which are deprecated. It might behave strangely"
  325. ),
  326. DeprecationWarning,
  327. )
  328. self._last_warning = time.time()
  329. f_group_owner = config & GROUP_OWNER
  330. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  331. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  332. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  333. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  334. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  335. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  336. f_group_admin = config & GROUP_ADMIN
  337. f_group_member = config & GROUP_MEMBER
  338. f_pm = config & PM
  339. f_group_admin_any = (
  340. f_group_admin_add_admins
  341. or f_group_admin_change_info
  342. or f_group_admin_ban_users
  343. or f_group_admin_delete_messages
  344. or f_group_admin_pin_messages
  345. or f_group_admin_invite_users
  346. or f_group_admin
  347. )
  348. if user_id in self._owner:
  349. return True
  350. if user_id in self._db.get(main.__name__, "blacklist_users", []):
  351. return False
  352. if message is None: # In case of checking inline query security map
  353. return self._check_tsec_inline(user_id, inline_cmd) or bool(
  354. config & EVERYONE
  355. )
  356. try:
  357. chat = utils.get_chat_id(message)
  358. except Exception:
  359. chat = None
  360. try:
  361. cmd = message.raw_text[1:].split()[0].strip()
  362. if usernames:
  363. for username in usernames:
  364. cmd = cmd.replace(f"@{username}", "")
  365. except Exception:
  366. cmd = None
  367. if callable(func):
  368. command = self._client.loader.find_alias(cmd, include_legacy=True) or cmd
  369. for info in self._sgroups.copy().values():
  370. if user_id in info.users:
  371. for permission in info.permissions:
  372. if (
  373. permission["rule_type"] == "command"
  374. and permission["rule"] == command
  375. ):
  376. logger.debug("sgroup match for %s", command)
  377. return True
  378. if (
  379. permission["rule_type"] == "module"
  380. and permission["rule"] == func.__self__.__class__.__name__
  381. ):
  382. logger.debug(
  383. "sgroup match for %s", func.__self__.__class__.__name__
  384. )
  385. return True
  386. for info in self._tsec_user.copy():
  387. if info["target"] == user_id:
  388. if info["rule_type"] == "command" and info["rule"] == command:
  389. logger.debug("tsec match for user %s", command)
  390. return True
  391. if (
  392. info["rule_type"] == "module"
  393. and info["rule"] == func.__self__.__class__.__name__
  394. ):
  395. logger.debug(
  396. "tsec match for user %s",
  397. func.__self__.__class__.__name__,
  398. )
  399. return True
  400. if chat:
  401. for info in self._tsec_chat.copy():
  402. if info["target"] == chat:
  403. if info["rule_type"] == "command" and info["rule"] == command:
  404. logger.debug("tsec match for %s", command)
  405. return True
  406. if (
  407. info["rule_type"] == "module"
  408. and info["rule"] == func.__self__.__class__.__name__
  409. ):
  410. logger.debug(
  411. "tsec match for %s",
  412. func.__self__.__class__.__name__,
  413. )
  414. return True
  415. if f_group_member and message.is_group or f_pm and message.is_private:
  416. return True
  417. if message.is_channel:
  418. if not message.is_group:
  419. chat_id = utils.get_chat_id(message)
  420. if (
  421. chat_id in self._cache
  422. and self._cache[chat_id]["exp"] >= time.time()
  423. ):
  424. chat = self._cache[chat_id]["chat"]
  425. else:
  426. chat = await message.get_chat()
  427. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  428. if (
  429. not chat.creator
  430. and not chat.admin_rights
  431. or not chat.creator
  432. and not chat.admin_rights.post_messages
  433. ):
  434. return False
  435. if self._any_admin and f_group_admin_any or f_group_admin:
  436. return True
  437. elif f_group_admin_any or f_group_owner:
  438. chat_id = utils.get_chat_id(message)
  439. cache_obj = f"{chat_id}/{user_id}"
  440. if (
  441. cache_obj in self._cache
  442. and self._cache[cache_obj]["exp"] >= time.time()
  443. ):
  444. participant = self._cache[cache_obj]["user"]
  445. else:
  446. participant = await message.client.get_permissions(
  447. message.peer_id,
  448. user_id,
  449. )
  450. self._cache[cache_obj] = {
  451. "user": participant,
  452. "exp": time.time() + 5 * 60,
  453. }
  454. if (
  455. participant.is_creator
  456. or participant.is_admin
  457. and (
  458. self._any_admin
  459. and f_group_admin_any
  460. or f_group_admin
  461. or f_group_admin_add_admins
  462. and participant.add_admins
  463. or f_group_admin_change_info
  464. and participant.change_info
  465. or f_group_admin_ban_users
  466. and participant.ban_users
  467. or f_group_admin_delete_messages
  468. and participant.delete_messages
  469. or f_group_admin_pin_messages
  470. and participant.pin_messages
  471. or f_group_admin_invite_users
  472. and participant.invite_users
  473. )
  474. ):
  475. return True
  476. return False
  477. if message.is_group and (f_group_admin_any or f_group_owner):
  478. chat_id = utils.get_chat_id(message)
  479. cache_obj = f"{chat_id}/{user_id}"
  480. if (
  481. cache_obj in self._cache
  482. and self._cache[cache_obj]["exp"] >= time.time()
  483. ):
  484. participant = self._cache[cache_obj]["user"]
  485. else:
  486. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  487. participants = full_chat.full_chat.participants.participants
  488. participant = next(
  489. (
  490. possible_participant
  491. for possible_participant in participants
  492. if possible_participant.user_id == message.sender_id
  493. ),
  494. None,
  495. )
  496. self._cache[cache_obj] = {
  497. "user": participant,
  498. "exp": time.time() + 5 * 60,
  499. }
  500. if not participant:
  501. return
  502. if (
  503. isinstance(participant, ChatParticipantCreator)
  504. or isinstance(participant, ChatParticipantAdmin)
  505. and f_group_admin_any
  506. ):
  507. return True
  508. return False
  509. _check = check # Legacy