security.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. """Checks the commands' security"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import logging
  22. import time
  23. from typing import Optional
  24. from telethon.hints import EntityLike
  25. from telethon.utils import get_display_name
  26. from telethon.tl.functions.messages import GetFullChatRequest
  27. from telethon.tl.types import ChatParticipantAdmin, ChatParticipantCreator, Message
  28. from . import main, utils
  29. from .database import Database
  30. from .tl_cache import CustomTelegramClient
  31. logger = logging.getLogger(__name__)
  32. OWNER = 1 << 0
  33. SUDO = 1 << 1
  34. SUPPORT = 1 << 2
  35. GROUP_OWNER = 1 << 3
  36. GROUP_ADMIN_ADD_ADMINS = 1 << 4
  37. GROUP_ADMIN_CHANGE_INFO = 1 << 5
  38. GROUP_ADMIN_BAN_USERS = 1 << 6
  39. GROUP_ADMIN_DELETE_MESSAGES = 1 << 7
  40. GROUP_ADMIN_PIN_MESSAGES = 1 << 8
  41. GROUP_ADMIN_INVITE_USERS = 1 << 9
  42. GROUP_ADMIN = 1 << 10
  43. GROUP_MEMBER = 1 << 11
  44. PM = 1 << 12
  45. EVERYONE = 1 << 13
  46. BITMAP = {
  47. "OWNER": OWNER,
  48. "SUDO": SUDO,
  49. "SUPPORT": SUPPORT,
  50. "GROUP_OWNER": GROUP_OWNER,
  51. "GROUP_ADMIN_ADD_ADMINS": GROUP_ADMIN_ADD_ADMINS,
  52. "GROUP_ADMIN_CHANGE_INFO": GROUP_ADMIN_CHANGE_INFO,
  53. "GROUP_ADMIN_BAN_USERS": GROUP_ADMIN_BAN_USERS,
  54. "GROUP_ADMIN_DELETE_MESSAGES": GROUP_ADMIN_DELETE_MESSAGES,
  55. "GROUP_ADMIN_PIN_MESSAGES": GROUP_ADMIN_PIN_MESSAGES,
  56. "GROUP_ADMIN_INVITE_USERS": GROUP_ADMIN_INVITE_USERS,
  57. "GROUP_ADMIN": GROUP_ADMIN,
  58. "GROUP_MEMBER": GROUP_MEMBER,
  59. "PM": PM,
  60. "EVERYONE": EVERYONE,
  61. }
  62. GROUP_ADMIN_ANY = (
  63. GROUP_ADMIN_ADD_ADMINS
  64. | GROUP_ADMIN_CHANGE_INFO
  65. | GROUP_ADMIN_BAN_USERS
  66. | GROUP_ADMIN_DELETE_MESSAGES
  67. | GROUP_ADMIN_PIN_MESSAGES
  68. | GROUP_ADMIN_INVITE_USERS
  69. | GROUP_ADMIN
  70. )
  71. DEFAULT_PERMISSIONS = OWNER | SUDO
  72. PUBLIC_PERMISSIONS = GROUP_OWNER | GROUP_ADMIN_ANY | GROUP_MEMBER | PM
  73. ALL = (1 << 13) - 1
  74. def owner(func: callable) -> callable:
  75. return _sec(func, OWNER)
  76. def sudo(func: callable) -> callable:
  77. return _sec(func, SUDO)
  78. def support(func: callable) -> callable:
  79. return _sec(func, SUDO | SUPPORT)
  80. def group_owner(func: callable) -> callable:
  81. return _sec(func, SUDO | GROUP_OWNER)
  82. def group_admin_add_admins(func: callable) -> callable:
  83. return _sec(func, SUDO | GROUP_ADMIN_ADD_ADMINS)
  84. def group_admin_change_info(func: callable) -> callable:
  85. return _sec(func, SUDO | GROUP_ADMIN_CHANGE_INFO)
  86. def group_admin_ban_users(func: callable) -> callable:
  87. return _sec(func, SUDO | GROUP_ADMIN_BAN_USERS)
  88. def group_admin_delete_messages(func: callable) -> callable:
  89. return _sec(func, SUDO | GROUP_ADMIN_DELETE_MESSAGES)
  90. def group_admin_pin_messages(func: callable) -> callable:
  91. return _sec(func, SUDO | GROUP_ADMIN_PIN_MESSAGES)
  92. def group_admin_invite_users(func: callable) -> callable:
  93. return _sec(func, SUDO | GROUP_ADMIN_INVITE_USERS)
  94. def group_admin(func: callable) -> callable:
  95. return _sec(func, SUDO | GROUP_ADMIN)
  96. def group_member(func: callable) -> callable:
  97. return _sec(func, SUDO | GROUP_MEMBER)
  98. def pm(func: callable) -> callable:
  99. return _sec(func, SUDO | PM)
  100. def unrestricted(func: callable) -> callable:
  101. return _sec(func, ALL)
  102. def inline_everyone(func: callable) -> callable:
  103. return _sec(func, EVERYONE)
  104. def _sec(func: callable, flags: int) -> callable:
  105. prev = getattr(func, "security", 0)
  106. func.security = prev | OWNER | flags
  107. return func
  108. class SecurityManager:
  109. def __init__(self, client: CustomTelegramClient, db: Database):
  110. self._client = client
  111. self._db = db
  112. self._cache = {}
  113. self._any_admin = db.get(__name__, "any_admin", False)
  114. self._default = db.get(__name__, "default", DEFAULT_PERMISSIONS)
  115. self._tsec_chat = db.pointer(__name__, "tsec_chat", [])
  116. self._tsec_user = db.pointer(__name__, "tsec_user", [])
  117. self._owner = db.pointer(__name__, "owner", [])
  118. self._sudo = db.pointer(__name__, "sudo", [])
  119. self._support = db.pointer(__name__, "support", [])
  120. self._reload_rights()
  121. self.any_admin = self._any_admin
  122. self.default = self._default
  123. self.tsec_chat = self._tsec_chat
  124. self.tsec_user = self._tsec_user
  125. self.owner = self._owner
  126. self.sudo = self._sudo
  127. self.support = self._support
  128. def _reload_rights(self):
  129. if self._client.tg_id not in self._owner:
  130. self._owner.append(self._client.tg_id)
  131. for info in self._tsec_user.copy():
  132. if info["expires"] and info["expires"] < time.time():
  133. self._tsec_user.remove(info)
  134. for info in self._tsec_chat.copy():
  135. if info["expires"] and info["expires"] < time.time():
  136. self._tsec_chat.remove(info)
  137. def add_rule(
  138. self,
  139. target_type: str,
  140. target: EntityLike,
  141. rule: str,
  142. duration: int,
  143. ):
  144. if target_type not in {"chat", "user"}:
  145. raise ValueError(f"Invalid target_type: {target_type}")
  146. if not rule.startswith("command") and not rule.startswith("module"):
  147. raise ValueError(f"Invalid rule: {rule}")
  148. if duration < 0:
  149. raise ValueError(f"Invalid duration: {duration}")
  150. (self._tsec_chat if target_type == "chat" else self._tsec_user).append(
  151. {
  152. "target": target.id,
  153. "rule_type": rule.split("/")[0],
  154. "rule": rule.split("/", maxsplit=1)[1],
  155. "expires": int(time.time() + duration) if duration else 0,
  156. "entity_name": get_display_name(target),
  157. "entity_url": utils.get_entity_url(target),
  158. }
  159. )
  160. def remove_rules(self, target_type: str, target_id: int) -> bool:
  161. any_ = False
  162. if target_type == "user":
  163. for rule in self.tsec_user.copy():
  164. if rule["target"] == target_id:
  165. self.tsec_user.remove(rule)
  166. any_ = True
  167. elif target_type == "chat":
  168. for rule in self.tsec_chat.copy():
  169. if rule["target"] == target_id:
  170. self.tsec_chat.remove(rule)
  171. any_ = True
  172. return any_
  173. def get_flags(self, func: callable) -> int:
  174. if isinstance(func, int):
  175. config = func
  176. else:
  177. # Return masks there so user don't need to reboot
  178. # every time he changes permissions. It doesn't
  179. # decrease security at all, bc user anyway can
  180. # access this attribute
  181. config = self._db.get(__name__, "masks", {}).get(
  182. f"{func.__module__}.{func.__name__}",
  183. getattr(func, "security", self._default),
  184. )
  185. if config & ~ALL and not config & EVERYONE:
  186. logger.error("Security config contains unknown bits")
  187. return False
  188. return config & self._db.get(__name__, "bounding_mask", DEFAULT_PERMISSIONS)
  189. async def _check(
  190. self,
  191. message: Message,
  192. func: callable,
  193. user: Optional[int] = None,
  194. ) -> bool:
  195. """Checks if message sender is permitted to execute certain function"""
  196. self._reload_rights()
  197. if not (config := self.get_flags(func)):
  198. return False
  199. if not user:
  200. user = message.sender_id
  201. if user == self._client.tg_id or getattr(message, "out", False):
  202. return True
  203. logger.debug(f"Checking security match for {config}")
  204. f_owner = config & OWNER
  205. f_sudo = config & SUDO
  206. f_support = config & SUPPORT
  207. f_group_owner = config & GROUP_OWNER
  208. f_group_admin_add_admins = config & GROUP_ADMIN_ADD_ADMINS
  209. f_group_admin_change_info = config & GROUP_ADMIN_CHANGE_INFO
  210. f_group_admin_ban_users = config & GROUP_ADMIN_BAN_USERS
  211. f_group_admin_delete_messages = config & GROUP_ADMIN_DELETE_MESSAGES
  212. f_group_admin_pin_messages = config & GROUP_ADMIN_PIN_MESSAGES
  213. f_group_admin_invite_users = config & GROUP_ADMIN_INVITE_USERS
  214. f_group_admin = config & GROUP_ADMIN
  215. f_group_member = config & GROUP_MEMBER
  216. f_pm = config & PM
  217. f_group_admin_any = (
  218. f_group_admin_add_admins
  219. or f_group_admin_change_info
  220. or f_group_admin_ban_users
  221. or f_group_admin_delete_messages
  222. or f_group_admin_pin_messages
  223. or f_group_admin_invite_users
  224. or f_group_admin
  225. )
  226. if (
  227. f_owner
  228. and user in self._owner
  229. or f_sudo
  230. and user in self._sudo
  231. or f_support
  232. and user in self._support
  233. ):
  234. return True
  235. if user in self._db.get(main.__name__, "blacklist_users", []):
  236. return False
  237. if message is None: # In case of checking inline query security map
  238. return bool(config & EVERYONE)
  239. try:
  240. chat = utils.get_chat_id(message)
  241. except Exception:
  242. chat = None
  243. try:
  244. cmd = message.raw_text[1:].split()[0].strip()
  245. except Exception:
  246. cmd = None
  247. if callable(func):
  248. for info in self._tsec_user.copy():
  249. if info["target"] == user:
  250. if info["rule_type"] == "command" and info["rule"] == cmd:
  251. logger.debug(f"tsec match for user {cmd}")
  252. return True
  253. if (
  254. info["rule_type"] == "module"
  255. and info["rule"] == func.__self__.__class__.__name__
  256. ):
  257. logger.debug(
  258. f"tsec match for user {func.__self__.__class__.__name__}"
  259. )
  260. return True
  261. if chat:
  262. for info in self._tsec_chat.copy():
  263. if info["target"] == chat:
  264. if info["rule_type"] == "command" and info["rule"] == cmd:
  265. logger.debug(f"tsec match for {cmd}")
  266. return True
  267. if (
  268. info["rule_type"] == "module"
  269. and info["rule"] == func.__self__.__class__.__name__
  270. ):
  271. logger.debug(
  272. f"tsec match for {func.__self__.__class__.__name__}"
  273. )
  274. return True
  275. if f_group_member and message.is_group or f_pm and message.is_private:
  276. return True
  277. if message.is_channel:
  278. if not message.is_group:
  279. if message.edit_date:
  280. return False
  281. chat_id = utils.get_chat_id(message)
  282. if (
  283. chat_id in self._cache
  284. and self._cache[chat_id]["exp"] >= time.time()
  285. ):
  286. chat = self._cache[chat_id]["chat"]
  287. else:
  288. chat = await message.get_chat()
  289. self._cache[chat_id] = {"chat": chat, "exp": time.time() + 5 * 60}
  290. if (
  291. not chat.creator
  292. and not chat.admin_rights
  293. or not chat.creator
  294. and not chat.admin_rights.post_messages
  295. ):
  296. return False
  297. if self._any_admin and f_group_admin_any or f_group_admin:
  298. return True
  299. elif f_group_admin_any or f_group_owner:
  300. chat_id = utils.get_chat_id(message)
  301. cache_obj = f"{chat_id}/{user}"
  302. if (
  303. cache_obj in self._cache
  304. and self._cache[cache_obj]["exp"] >= time.time()
  305. ):
  306. participant = self._cache[cache_obj]["user"]
  307. else:
  308. participant = await message.client.get_permissions(
  309. message.peer_id,
  310. user,
  311. )
  312. self._cache[cache_obj] = {
  313. "user": participant,
  314. "exp": time.time() + 5 * 60,
  315. }
  316. if (
  317. participant.is_creator
  318. or participant.is_admin
  319. and (
  320. self._any_admin
  321. and f_group_admin_any
  322. or f_group_admin
  323. or f_group_admin_add_admins
  324. and participant.add_admins
  325. or f_group_admin_change_info
  326. and participant.change_info
  327. or f_group_admin_ban_users
  328. and participant.ban_users
  329. or f_group_admin_delete_messages
  330. and participant.delete_messages
  331. or f_group_admin_pin_messages
  332. and participant.pin_messages
  333. or f_group_admin_invite_users
  334. and participant.invite_users
  335. )
  336. ):
  337. return True
  338. return False
  339. if message.is_group and (f_group_admin_any or f_group_owner):
  340. chat_id = utils.get_chat_id(message)
  341. cache_obj = f"{chat_id}/{user}"
  342. if (
  343. cache_obj in self._cache
  344. and self._cache[cache_obj]["exp"] >= time.time()
  345. ):
  346. participant = self._cache[cache_obj]["user"]
  347. else:
  348. full_chat = await message.client(GetFullChatRequest(message.chat_id))
  349. participants = full_chat.full_chat.participants.participants
  350. participant = next(
  351. (
  352. possible_participant
  353. for possible_participant in participants
  354. if possible_participant.user_id == message.sender_id
  355. ),
  356. None,
  357. )
  358. self._cache[cache_obj] = {
  359. "user": participant,
  360. "exp": time.time() + 5 * 60,
  361. }
  362. if not participant:
  363. return
  364. if (
  365. isinstance(participant, ChatParticipantCreator)
  366. or isinstance(participant, ChatParticipantAdmin)
  367. and f_group_admin_any
  368. ):
  369. return True
  370. return False
  371. check = _check