utils.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import asyncio
  9. import logging
  10. import contextlib
  11. import io
  12. import os
  13. from copy import deepcopy
  14. import re
  15. import typing
  16. from urllib.parse import urlparse
  17. import functools
  18. from aiogram.types import (
  19. CallbackQuery,
  20. InlineKeyboardButton,
  21. InlineKeyboardMarkup,
  22. InputMediaAnimation,
  23. InputMediaDocument,
  24. InputMediaAudio,
  25. InputMediaPhoto,
  26. InputMediaVideo,
  27. InputFile,
  28. )
  29. from aiogram.utils.exceptions import (
  30. MessageIdInvalid,
  31. MessageNotModified,
  32. RetryAfter,
  33. BadRequest,
  34. )
  35. from .. import utils
  36. from ..types import HikkaReplyMarkup
  37. from .types import InlineUnit, InlineCall
  38. from telethon.utils import resolve_inline_message_id
  39. logger = logging.getLogger(__name__)
  40. class Utils(InlineUnit):
  41. def _generate_markup(
  42. self,
  43. markup_obj: typing.Optional[typing.Union[HikkaReplyMarkup, str]],
  44. ) -> typing.Optional[InlineKeyboardMarkup]:
  45. """Generate markup for form or list of `dict`s"""
  46. if not markup_obj:
  47. return None
  48. if isinstance(markup_obj, InlineKeyboardMarkup):
  49. return markup_obj
  50. markup = InlineKeyboardMarkup()
  51. map_ = (
  52. self._units[markup_obj]["buttons"]
  53. if isinstance(markup_obj, str)
  54. else markup_obj
  55. )
  56. map_ = self._normalize_markup(map_)
  57. setup_callbacks = False
  58. for row in map_:
  59. for button in row:
  60. if not isinstance(button, dict):
  61. logger.error(
  62. f"Button {button} is not a `dict`, but `{type(button)}` in"
  63. f" {map_}"
  64. )
  65. return None
  66. if "callback" not in button:
  67. if button.get("action") == "close":
  68. button["callback"] = self._close_unit_handler
  69. if button.get("action") == "unload":
  70. button["callback"] = self._unload_unit_handler
  71. if button.get("action") == "answer":
  72. if not button.get("message"):
  73. logger.error(
  74. f"Button {button} has no `message` to answer with"
  75. )
  76. return None
  77. button["callback"] = functools.partial(
  78. self._answer_unit_handler,
  79. show_alert=button.get("show_alert", False),
  80. text=button["message"],
  81. )
  82. if "callback" in button and "_callback_data" not in button:
  83. button["_callback_data"] = utils.rand(30)
  84. setup_callbacks = True
  85. if "input" in button and "_switch_query" not in button:
  86. button["_switch_query"] = utils.rand(10)
  87. for row in map_:
  88. line = []
  89. for button in row:
  90. try:
  91. if "url" in button:
  92. if not utils.check_url(button["url"]):
  93. logger.warning(
  94. "Button have not been added to form, "
  95. "because its url is invalid"
  96. )
  97. continue
  98. line += [
  99. InlineKeyboardButton(
  100. button["text"],
  101. url=button["url"],
  102. )
  103. ]
  104. elif "callback" in button:
  105. line += [
  106. InlineKeyboardButton(
  107. button["text"],
  108. callback_data=button["_callback_data"],
  109. )
  110. ]
  111. if setup_callbacks:
  112. self._custom_map[button["_callback_data"]] = {
  113. "handler": button["callback"],
  114. **(
  115. {"always_allow": button["always_allow"]}
  116. if button.get("always_allow", False)
  117. else {}
  118. ),
  119. **(
  120. {"args": button["args"]}
  121. if button.get("args", False)
  122. else {}
  123. ),
  124. **(
  125. {"kwargs": button["kwargs"]}
  126. if button.get("kwargs", False)
  127. else {}
  128. ),
  129. **(
  130. {"force_me": True}
  131. if button.get("force_me", False)
  132. else {}
  133. ),
  134. **(
  135. {"disable_security": True}
  136. if button.get("disable_security", False)
  137. else {}
  138. ),
  139. }
  140. elif "input" in button:
  141. line += [
  142. InlineKeyboardButton(
  143. button["text"],
  144. switch_inline_query_current_chat=button["_switch_query"]
  145. + " ",
  146. )
  147. ]
  148. elif "data" in button:
  149. line += [
  150. InlineKeyboardButton(
  151. button["text"],
  152. callback_data=button["data"],
  153. )
  154. ]
  155. elif "switch_inline_query_current_chat" in button:
  156. line += [
  157. InlineKeyboardButton(
  158. button["text"],
  159. switch_inline_query_current_chat=button[
  160. "switch_inline_query_current_chat"
  161. ],
  162. )
  163. ]
  164. elif "switch_inline_query" in button:
  165. line += [
  166. InlineKeyboardButton(
  167. button["text"],
  168. switch_inline_query_current_chat=button[
  169. "switch_inline_query"
  170. ],
  171. )
  172. ]
  173. else:
  174. logger.warning(
  175. "Button have not been added to "
  176. "form, because it is not structured "
  177. f"properly. {button}"
  178. )
  179. except KeyError:
  180. logger.exception(
  181. "Error while forming markup! Probably, you "
  182. "passed wrong type combination for button. "
  183. "Contact developer of module."
  184. )
  185. return False
  186. markup.row(*line)
  187. return markup
  188. generate_markup = _generate_markup
  189. async def _close_unit_handler(self, call: InlineCall):
  190. await call.delete()
  191. async def _unload_unit_handler(self, call: InlineCall):
  192. await call.unload()
  193. async def _answer_unit_handler(self, call: InlineCall, text: str, show_alert: bool):
  194. await call.answer(text, show_alert=show_alert)
  195. async def check_inline_security(self, *, func: callable, user: int) -> bool:
  196. """Checks if user with id `user` is allowed to run function `func`"""
  197. return await self._client.dispatcher.security.check(
  198. message=None,
  199. func=func,
  200. user_id=user,
  201. )
  202. def _find_caller_sec_map(self) -> typing.Optional[callable]:
  203. try:
  204. caller = utils.find_caller()
  205. if not caller:
  206. return None
  207. logger.debug(f"Found caller: {caller}")
  208. return lambda: self._client.dispatcher.security.get_flags(
  209. getattr(caller, "__self__", caller),
  210. )
  211. except Exception:
  212. logger.debug("Can't parse security mask in form", exc_info=True)
  213. return None
  214. def _normalize_markup(self, reply_markup: typing.Union[dict, list]) -> list:
  215. if isinstance(reply_markup, dict):
  216. return [[reply_markup]]
  217. if isinstance(reply_markup, list) and any(
  218. isinstance(i, dict) for i in reply_markup
  219. ):
  220. return [reply_markup]
  221. return reply_markup
  222. def sanitise_text(self, text: str) -> str:
  223. """
  224. Replaces all animated emojis in text with normal ones,
  225. bc aiogram doesn't support them
  226. :param text: text to sanitise
  227. :return: sanitised text
  228. """
  229. return re.sub(r"</?emoji.*?>", "", text)
  230. async def _edit_unit(
  231. self,
  232. text: typing.Optional[str] = None,
  233. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  234. *,
  235. photo: typing.Optional[str] = None,
  236. file: typing.Optional[str] = None,
  237. video: typing.Optional[str] = None,
  238. audio: typing.Optional[typing.Union[dict, str]] = None,
  239. gif: typing.Optional[str] = None,
  240. mime_type: typing.Optional[str] = None,
  241. force_me: typing.Optional[bool] = None,
  242. disable_security: typing.Optional[bool] = None,
  243. always_allow: typing.Optional[typing.List[int]] = None,
  244. disable_web_page_preview: bool = True,
  245. query: typing.Optional[CallbackQuery] = None,
  246. unit_id: typing.Optional[str] = None,
  247. inline_message_id: typing.Optional[str] = None,
  248. chat_id: typing.Optional[int] = None,
  249. message_id: typing.Optional[int] = None,
  250. ) -> bool:
  251. """
  252. Edits unit message
  253. :param text: Text of message
  254. :param reply_markup: Inline keyboard
  255. :param photo: Url to a valid photo to attach to message
  256. :param file: Url to a valid file to attach to message
  257. :param video: Url to a valid video to attach to message
  258. :param audio: Url to a valid audio to attach to message
  259. :param gif: Url to a valid gif to attach to message
  260. :param mime_type: Mime type of file
  261. :param force_me: Allow only userbot owner to interact with buttons
  262. :param disable_security: Disable security check for buttons
  263. :param always_allow: List of user ids, which will always be allowed
  264. :param disable_web_page_preview: Disable web page preview
  265. :param query: Callback query
  266. :return: Status of edit
  267. """
  268. reply_markup = self._validate_markup(reply_markup) or []
  269. if text is not None and not isinstance(text, str):
  270. logger.error(
  271. "Invalid type for `text`. Expected `str`, got `%s`", type(text)
  272. )
  273. return False
  274. if file and not mime_type:
  275. logger.error(
  276. "You must pass `mime_type` along with `file` field\n"
  277. "It may be either 'application/zip' or 'application/pdf'"
  278. )
  279. return False
  280. if isinstance(audio, str):
  281. audio = {"url": audio}
  282. if isinstance(text, str):
  283. text = self.sanitise_text(text)
  284. media_params = [
  285. photo is None,
  286. gif is None,
  287. file is None,
  288. video is None,
  289. audio is None,
  290. ]
  291. if media_params.count(False) > 1:
  292. logger.error("You passed two or more exclusive parameters simultaneously")
  293. return False
  294. if unit_id is not None and unit_id in self._units:
  295. unit = self._units[unit_id]
  296. unit["buttons"] = reply_markup
  297. if isinstance(force_me, bool):
  298. unit["force_me"] = force_me
  299. if isinstance(disable_security, bool):
  300. unit["disable_security"] = disable_security
  301. if isinstance(always_allow, list):
  302. unit["always_allow"] = always_allow
  303. else:
  304. unit = {}
  305. if not chat_id or not message_id:
  306. inline_message_id = (
  307. inline_message_id
  308. or unit.get("inline_message_id", False)
  309. or getattr(query, "inline_message_id", None)
  310. )
  311. if not chat_id and not message_id and not inline_message_id:
  312. logger.warning(
  313. "Attempted to edit message with no `inline_message_id`. "
  314. "Possible reasons:\n"
  315. "- Form was sent without buttons and due to "
  316. "the limits of Telegram API can't be edited\n"
  317. "- There is an in-userbot error, which you should report"
  318. )
  319. return False
  320. # If passed `photo` is gif
  321. try:
  322. path = urlparse(photo).path
  323. ext = os.path.splitext(path)[1]
  324. except Exception:
  325. ext = None
  326. if photo is not None and ext in {".gif", ".mp4"}:
  327. gif = deepcopy(photo)
  328. photo = None
  329. media = next(
  330. (media for media in [photo, file, video, audio, gif] if media), None
  331. )
  332. if isinstance(media, bytes):
  333. media = io.BytesIO(media)
  334. media.name = "upload.mp4"
  335. if isinstance(media, io.BytesIO):
  336. media = InputFile(media)
  337. if file:
  338. media = InputMediaDocument(media, caption=text, parse_mode="HTML")
  339. elif photo:
  340. media = InputMediaPhoto(media, caption=text, parse_mode="HTML")
  341. elif audio:
  342. if isinstance(audio, dict):
  343. media = InputMediaAudio(
  344. audio["url"],
  345. title=audio.get("title"),
  346. performer=audio.get("performer"),
  347. duration=audio.get("duration"),
  348. caption=text,
  349. parse_mode="HTML",
  350. )
  351. else:
  352. media = InputMediaAudio(
  353. audio,
  354. caption=text,
  355. parse_mode="HTML",
  356. )
  357. elif video:
  358. media = InputMediaVideo(media, caption=text, parse_mode="HTML")
  359. elif gif:
  360. media = InputMediaAnimation(media, caption=text, parse_mode="HTML")
  361. if media is None and text is None and reply_markup:
  362. try:
  363. await self.bot.edit_message_reply_markup(
  364. **(
  365. {"inline_message_id": inline_message_id}
  366. if inline_message_id
  367. else {"chat_id": chat_id, "message_id": message_id}
  368. ),
  369. reply_markup=self.generate_markup(reply_markup),
  370. )
  371. except Exception:
  372. return False
  373. return True
  374. if media is None and text is None:
  375. logger.error("You must pass either `text` or `media` or `reply_markup`")
  376. return False
  377. if media is None:
  378. try:
  379. await self.bot.edit_message_text(
  380. text,
  381. **(
  382. {"inline_message_id": inline_message_id}
  383. if inline_message_id
  384. else {"chat_id": chat_id, "message_id": message_id}
  385. ),
  386. disable_web_page_preview=disable_web_page_preview,
  387. reply_markup=self.generate_markup(
  388. reply_markup
  389. if isinstance(reply_markup, list)
  390. else unit.get("buttons", [])
  391. ),
  392. )
  393. except MessageNotModified:
  394. if query:
  395. with contextlib.suppress(Exception):
  396. await query.answer()
  397. return False
  398. except RetryAfter as e:
  399. logger.info(f"Sleeping {e.timeout}s on aiogram FloodWait...")
  400. await asyncio.sleep(e.timeout)
  401. return await self._edit_unit(**utils.get_kwargs())
  402. except MessageIdInvalid:
  403. with contextlib.suppress(Exception):
  404. await query.answer(
  405. "I should have edited some message, but it is deleted :("
  406. )
  407. return False
  408. except BadRequest as e:
  409. if "There is no text in the message to edit" not in str(e):
  410. raise
  411. try:
  412. await self.bot.edit_message_caption(
  413. caption=text,
  414. **(
  415. {"inline_message_id": inline_message_id}
  416. if inline_message_id
  417. else {"chat_id": chat_id, "message_id": message_id}
  418. ),
  419. reply_markup=self.generate_markup(
  420. reply_markup
  421. if isinstance(reply_markup, list)
  422. else unit.get("buttons", [])
  423. ),
  424. )
  425. except Exception:
  426. return False
  427. else:
  428. return True
  429. else:
  430. return True
  431. try:
  432. await self.bot.edit_message_media(
  433. **(
  434. {"inline_message_id": inline_message_id}
  435. if inline_message_id
  436. else {"chat_id": chat_id, "message_id": message_id}
  437. ),
  438. media=media,
  439. reply_markup=self.generate_markup(
  440. reply_markup
  441. if isinstance(reply_markup, list)
  442. else unit.get("buttons", [])
  443. ),
  444. )
  445. except RetryAfter as e:
  446. logger.info(f"Sleeping {e.timeout}s on aiogram FloodWait...")
  447. await asyncio.sleep(e.timeout)
  448. return await self._edit_unit(**utils.get_kwargs())
  449. except MessageIdInvalid:
  450. with contextlib.suppress(Exception):
  451. await query.answer(
  452. "I should have edited some message, but it is deleted :("
  453. )
  454. return False
  455. else:
  456. return True
  457. async def _delete_unit_message(
  458. self,
  459. call: typing.Optional[CallbackQuery] = None,
  460. unit_id: typing.Optional[str] = None,
  461. chat_id: typing.Optional[int] = None,
  462. message_id: typing.Optional[int] = None,
  463. ) -> bool:
  464. """Params `self`, `unit_id` are for internal use only, do not try to pass them
  465. """
  466. if getattr(getattr(call, "message", None), "chat", None):
  467. try:
  468. await self.bot.delete_message(
  469. chat_id=call.message.chat.id,
  470. message_id=call.message.message_id,
  471. )
  472. except Exception:
  473. return False
  474. return True
  475. if chat_id and message_id:
  476. try:
  477. await self.bot.delete_message(chat_id=chat_id, message_id=message_id)
  478. except Exception:
  479. return False
  480. return True
  481. if not unit_id and hasattr(call, "unit_id") and call.unit_id:
  482. unit_id = call.unit_id
  483. try:
  484. message_id, peer, _, _ = resolve_inline_message_id(
  485. self._units[unit_id]["inline_message_id"]
  486. )
  487. await self._client.delete_messages(peer, [message_id])
  488. await self._unload_unit(unit_id)
  489. except Exception:
  490. return False
  491. return True
  492. async def _unload_unit(self, unit_id: str) -> bool:
  493. """Params `self`, `unit_id` are for internal use only, do not try to pass them
  494. """
  495. try:
  496. if "on_unload" in self._units[unit_id] and callable(
  497. self._units[unit_id]["on_unload"]
  498. ):
  499. self._units[unit_id]["on_unload"]()
  500. if unit_id in self._units:
  501. del self._units[unit_id]
  502. else:
  503. return False
  504. except Exception:
  505. return False
  506. return True
  507. def build_pagination(
  508. self,
  509. callback: callable,
  510. total_pages: int,
  511. unit_id: typing.Optional[str] = None,
  512. current_page: typing.Optional[int] = None,
  513. ) -> typing.List[dict]:
  514. # Based on https://github.com/pystorage/pykeyboard/blob/master/pykeyboard/inline_pagination_keyboard.py#L4
  515. if current_page is None:
  516. current_page = self._units[unit_id]["current_index"] + 1
  517. if total_pages <= 5:
  518. return [
  519. [
  520. {"text": number, "args": (number - 1,), "callback": callback}
  521. if number != current_page
  522. else {
  523. "text": f"· {number} ·",
  524. "args": (number - 1,),
  525. "callback": callback,
  526. }
  527. for number in range(1, total_pages + 1)
  528. ]
  529. ]
  530. if current_page <= 3:
  531. return [
  532. [
  533. {
  534. "text": f"· {number} ·",
  535. "args": (number - 1,),
  536. "callback": callback,
  537. }
  538. if number == current_page
  539. else {
  540. "text": f"{number} ›",
  541. "args": (number - 1,),
  542. "callback": callback,
  543. }
  544. if number == 4
  545. else {
  546. "text": f"{total_pages} »",
  547. "args": (total_pages - 1,),
  548. "callback": callback,
  549. }
  550. if number == 5
  551. else {
  552. "text": number,
  553. "args": (number - 1,),
  554. "callback": callback,
  555. }
  556. for number in range(1, 6)
  557. ]
  558. ]
  559. if current_page > total_pages - 3:
  560. return [
  561. [
  562. {"text": "« 1", "args": (0,), "callback": callback},
  563. {
  564. "text": f"‹ {total_pages - 3}",
  565. "args": (total_pages - 4,),
  566. "callback": callback,
  567. },
  568. ]
  569. + [
  570. {
  571. "text": f"· {number} ·",
  572. "args": (number - 1,),
  573. "callback": callback,
  574. }
  575. if number == current_page
  576. else {
  577. "text": number,
  578. "args": (number - 1,),
  579. "callback": callback,
  580. }
  581. for number in range(total_pages - 2, total_pages + 1)
  582. ]
  583. ]
  584. return [
  585. [
  586. {"text": "« 1", "args": (0,), "callback": callback},
  587. {
  588. "text": f"‹ {current_page - 1}",
  589. "args": (current_page - 2,),
  590. "callback": callback,
  591. },
  592. {
  593. "text": f"· {current_page} ·",
  594. "args": (current_page - 1,),
  595. "callback": callback,
  596. },
  597. {
  598. "text": f"{current_page + 1} ›",
  599. "args": (current_page,),
  600. "callback": callback,
  601. },
  602. {
  603. "text": f"{total_pages} »",
  604. "args": (total_pages - 1,),
  605. "callback": callback,
  606. },
  607. ]
  608. ]
  609. def _validate_markup(
  610. self,
  611. buttons: typing.Optional[HikkaReplyMarkup],
  612. ) -> typing.List[typing.List[dict]]:
  613. if buttons is None:
  614. buttons = []
  615. if not isinstance(buttons, (list, dict)):
  616. logger.error(
  617. "Reply markup ommited because passed type is not valid"
  618. f" ({type(buttons)})"
  619. )
  620. return None
  621. buttons = self._normalize_markup(buttons)
  622. if not all(all(isinstance(button, dict) for button in row) for row in buttons):
  623. logger.error(
  624. "Reply markup ommited because passed invalid type for one of the"
  625. " buttons"
  626. )
  627. return None
  628. if not all(
  629. all(
  630. "url" in button
  631. or "callback" in button
  632. or "input" in button
  633. or "data" in button
  634. or "action" in button
  635. for button in row
  636. )
  637. for row in buttons
  638. ):
  639. logger.error(
  640. "Invalid button specified. "
  641. "Button must contain one of the following fields:\n"
  642. " - `url`\n"
  643. " - `callback`\n"
  644. " - `input`\n"
  645. " - `data`\n"
  646. " - `action`"
  647. )
  648. return None
  649. return buttons