utils.py 26 KB


  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import asyncio
  7. import contextlib
  8. import functools
  9. import io
  10. import itertools
  11. import logging
  12. import os
  13. import re
  14. import typing
  15. from copy import deepcopy
  16. from urllib.parse import urlparse
  17. from aiogram.types import (
  18. CallbackQuery,
  19. InlineKeyboardButton,
  20. InlineKeyboardMarkup,
  21. InputFile,
  22. InputMediaAnimation,
  23. InputMediaAudio,
  24. InputMediaDocument,
  25. InputMediaPhoto,
  26. InputMediaVideo,
  27. )
  28. from aiogram.utils.exceptions import (
  29. BadRequest,
  30. MessageIdInvalid,
  31. MessageNotModified,
  32. RetryAfter,
  33. )
  34. from hikkatl.utils import resolve_inline_message_id
  35. from .. import utils
  36. from ..types import HikkaReplyMarkup
  37. from .types import InlineCall, InlineUnit
  38. logger = logging.getLogger(__name__)
  39. class Utils(InlineUnit):
  40. def _generate_markup(
  41. self,
  42. markup_obj: typing.Optional[typing.Union[HikkaReplyMarkup, str]],
  43. ) -> typing.Optional[InlineKeyboardMarkup]:
  44. """Generate markup for form or list of `dict`s"""
  45. if not markup_obj:
  46. return None
  47. if isinstance(markup_obj, InlineKeyboardMarkup):
  48. return markup_obj
  49. markup = InlineKeyboardMarkup()
  50. map_ = (
  51. self._units[markup_obj]["buttons"]
  52. if isinstance(markup_obj, str)
  53. else markup_obj
  54. )
  55. map_ = self._normalize_markup(map_)
  56. setup_callbacks = False
  57. for row in map_:
  58. for button in row:
  59. if not isinstance(button, dict):
  60. logger.error(
  61. "Button %s is not a `dict`, but `%s` in %s",
  62. button,
  63. type(button),
  64. map_,
  65. )
  66. return None
  67. if "callback" not in button:
  68. if button.get("action") == "close":
  69. button["callback"] = self._close_unit_handler
  70. if button.get("action") == "unload":
  71. button["callback"] = self._unload_unit_handler
  72. if button.get("action") == "answer":
  73. if not button.get("message"):
  74. logger.error(
  75. "Button %s has no `message` to answer with", button
  76. )
  77. return None
  78. button["callback"] = functools.partial(
  79. self._answer_unit_handler,
  80. show_alert=button.get("show_alert", False),
  81. text=button["message"],
  82. )
  83. if "callback" in button and "_callback_data" not in button:
  84. button["_callback_data"] = utils.rand(30)
  85. setup_callbacks = True
  86. if "input" in button and "_switch_query" not in button:
  87. button["_switch_query"] = utils.rand(10)
  88. for row in map_:
  89. line = []
  90. for button in row:
  91. try:
  92. if "url" in button:
  93. if not utils.check_url(button["url"]):
  94. logger.warning(
  95. "Button have not been added to form, "
  96. "because its url is invalid"
  97. )
  98. continue
  99. line += [
  100. InlineKeyboardButton(
  101. button["text"],
  102. url=button["url"],
  103. )
  104. ]
  105. elif "callback" in button:
  106. line += [
  107. InlineKeyboardButton(
  108. button["text"],
  109. callback_data=button["_callback_data"],
  110. )
  111. ]
  112. if setup_callbacks:
  113. self._custom_map[button["_callback_data"]] = {
  114. "handler": button["callback"],
  115. **(
  116. {"always_allow": button["always_allow"]}
  117. if button.get("always_allow", False)
  118. else {}
  119. ),
  120. **(
  121. {"args": button["args"]}
  122. if button.get("args", False)
  123. else {}
  124. ),
  125. **(
  126. {"kwargs": button["kwargs"]}
  127. if button.get("kwargs", False)
  128. else {}
  129. ),
  130. **(
  131. {"force_me": True}
  132. if button.get("force_me", False)
  133. else {}
  134. ),
  135. **(
  136. {"disable_security": True}
  137. if button.get("disable_security", False)
  138. else {}
  139. ),
  140. }
  141. elif "input" in button:
  142. line += [
  143. InlineKeyboardButton(
  144. button["text"],
  145. switch_inline_query_current_chat=button["_switch_query"]
  146. + " ",
  147. )
  148. ]
  149. elif "data" in button:
  150. line += [
  151. InlineKeyboardButton(
  152. button["text"],
  153. callback_data=button["data"],
  154. )
  155. ]
  156. elif "switch_inline_query_current_chat" in button:
  157. line += [
  158. InlineKeyboardButton(
  159. button["text"],
  160. switch_inline_query_current_chat=button[
  161. "switch_inline_query_current_chat"
  162. ],
  163. )
  164. ]
  165. elif "switch_inline_query" in button:
  166. line += [
  167. InlineKeyboardButton(
  168. button["text"],
  169. switch_inline_query_current_chat=button[
  170. "switch_inline_query"
  171. ],
  172. )
  173. ]
  174. else:
  175. logger.warning(
  176. (
  177. "Button have not been added to "
  178. "form, because it is not structured "
  179. "properly. %s"
  180. ),
  181. button,
  182. )
  183. except KeyError:
  184. logger.exception(
  185. "Error while forming markup! Probably, you "
  186. "passed wrong type combination for button. "
  187. "Contact developer of module."
  188. )
  189. return False
  190. markup.row(*line)
  191. return markup
  192. generate_markup = _generate_markup
  193. async def _close_unit_handler(self, call: InlineCall):
  194. await call.delete()
  195. async def _unload_unit_handler(self, call: InlineCall):
  196. await call.unload()
  197. async def _answer_unit_handler(self, call: InlineCall, text: str, show_alert: bool):
  198. await call.answer(text, show_alert=show_alert)
  199. def _reverse_method_lookup(self, needle: callable, /) -> typing.Optional[str]:
  200. return next(
  201. (
  202. name
  203. for name, method in itertools.chain(
  204. self._allmodules.inline_handlers.items(),
  205. self._allmodules.callback_handlers.items(),
  206. )
  207. if method == needle
  208. ),
  209. None,
  210. )
  211. async def check_inline_security(self, *, func: typing.Callable, user: int) -> bool:
  212. """Checks if user with id `user` is allowed to run function `func`"""
  213. return await self._client.dispatcher.security.check(
  214. message=None,
  215. func=func,
  216. user_id=user,
  217. inline_cmd=self._reverse_method_lookup(func),
  218. )
  219. def _find_caller_sec_map(self) -> typing.Optional[typing.Callable[[], int]]:
  220. try:
  221. caller = utils.find_caller()
  222. if not caller:
  223. return None
  224. logger.debug("Found caller: %s", caller)
  225. return lambda: self._client.dispatcher.security.get_flags(
  226. getattr(caller, "__self__", caller),
  227. )
  228. except Exception:
  229. logger.debug("Can't parse security mask in form", exc_info=True)
  230. return None
  231. def _normalize_markup(
  232. self, reply_markup: HikkaReplyMarkup
  233. ) -> typing.List[typing.List[typing.Dict[str, typing.Any]]]:
  234. if isinstance(reply_markup, dict):
  235. return [[reply_markup]]
  236. if isinstance(reply_markup, list) and any(
  237. isinstance(i, dict) for i in reply_markup
  238. ):
  239. return [reply_markup]
  240. return reply_markup
  241. def sanitise_text(self, text: str) -> str:
  242. """
  243. Replaces all animated emojis in text with normal ones,
  244. bc aiogram doesn't support them
  245. :param text: text to sanitise
  246. :return: sanitised text
  247. """
  248. return re.sub(r"</?(?:emoji|blockquote).*?>", "", text)
  249. async def _edit_unit(
  250. self,
  251. text: typing.Optional[str] = None,
  252. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  253. *,
  254. photo: typing.Optional[str] = None,
  255. file: typing.Optional[str] = None,
  256. video: typing.Optional[str] = None,
  257. audio: typing.Optional[typing.Union[dict, str]] = None,
  258. gif: typing.Optional[str] = None,
  259. mime_type: typing.Optional[str] = None,
  260. force_me: typing.Optional[bool] = None,
  261. disable_security: typing.Optional[bool] = None,
  262. always_allow: typing.Optional[typing.List[int]] = None,
  263. disable_web_page_preview: bool = True,
  264. query: typing.Optional[CallbackQuery] = None,
  265. unit_id: typing.Optional[str] = None,
  266. inline_message_id: typing.Optional[str] = None,
  267. chat_id: typing.Optional[int] = None,
  268. message_id: typing.Optional[int] = None,
  269. ) -> bool:
  270. """
  271. Edits unit message
  272. :param text: Text of message
  273. :param reply_markup: Inline keyboard
  274. :param photo: Url to a valid photo to attach to message
  275. :param file: Url to a valid file to attach to message
  276. :param video: Url to a valid video to attach to message
  277. :param audio: Url to a valid audio to attach to message
  278. :param gif: Url to a valid gif to attach to message
  279. :param mime_type: Mime type of file
  280. :param force_me: Allow only userbot owner to interact with buttons
  281. :param disable_security: Disable security check for buttons
  282. :param always_allow: List of user ids, which will always be allowed
  283. :param disable_web_page_preview: Disable web page preview
  284. :param query: Callback query
  285. :return: Status of edit
  286. """
  287. reply_markup = self._validate_markup(reply_markup) or []
  288. if text is not None and not isinstance(text, str):
  289. logger.error(
  290. "Invalid type for `text`. Expected `str`, got `%s`", type(text)
  291. )
  292. return False
  293. if file and not mime_type:
  294. logger.error(
  295. "You must pass `mime_type` along with `file` field\n"
  296. "It may be either 'application/zip' or 'application/pdf'"
  297. )
  298. return False
  299. if isinstance(audio, str):
  300. audio = {"url": audio}
  301. if isinstance(text, str):
  302. text = self.sanitise_text(text)
  303. media_params = [
  304. photo is None,
  305. gif is None,
  306. file is None,
  307. video is None,
  308. audio is None,
  309. ]
  310. if media_params.count(False) > 1:
  311. logger.error("You passed two or more exclusive parameters simultaneously")
  312. return False
  313. if unit_id is not None and unit_id in self._units:
  314. unit = self._units[unit_id]
  315. unit["buttons"] = reply_markup
  316. if isinstance(force_me, bool):
  317. unit["force_me"] = force_me
  318. if isinstance(disable_security, bool):
  319. unit["disable_security"] = disable_security
  320. if isinstance(always_allow, list):
  321. unit["always_allow"] = always_allow
  322. else:
  323. unit = {}
  324. if not chat_id or not message_id:
  325. inline_message_id = (
  326. inline_message_id
  327. or unit.get("inline_message_id", False)
  328. or getattr(query, "inline_message_id", None)
  329. )
  330. if not chat_id and not message_id and not inline_message_id:
  331. logger.warning(
  332. "Attempted to edit message with no `inline_message_id`. "
  333. "Possible reasons:\n"
  334. "- Form was sent without buttons and due to "
  335. "the limits of Telegram API can't be edited\n"
  336. "- There is an in-userbot error, which you should report"
  337. )
  338. return False
  339. try:
  340. path = urlparse(photo).path
  341. ext = os.path.splitext(path)[1]
  342. except Exception:
  343. ext = None
  344. if photo is not None and ext in {".gif", ".mp4"}:
  345. gif = deepcopy(photo)
  346. photo = None
  347. media = next(
  348. (media for media in [photo, file, video, audio, gif] if media), None
  349. )
  350. if isinstance(media, bytes):
  351. media = io.BytesIO(media)
  352. media.name = "upload.mp4"
  353. if isinstance(media, io.BytesIO):
  354. media = InputFile(media)
  355. if file:
  356. media = InputMediaDocument(media, caption=text, parse_mode="HTML")
  357. elif photo:
  358. media = InputMediaPhoto(media, caption=text, parse_mode="HTML")
  359. elif audio:
  360. if isinstance(audio, dict):
  361. media = InputMediaAudio(
  362. audio["url"],
  363. title=audio.get("title"),
  364. performer=audio.get("performer"),
  365. duration=audio.get("duration"),
  366. caption=text,
  367. parse_mode="HTML",
  368. )
  369. else:
  370. media = InputMediaAudio(
  371. audio,
  372. caption=text,
  373. parse_mode="HTML",
  374. )
  375. elif video:
  376. media = InputMediaVideo(media, caption=text, parse_mode="HTML")
  377. elif gif:
  378. media = InputMediaAnimation(media, caption=text, parse_mode="HTML")
  379. if media is None and text is None and reply_markup:
  380. try:
  381. await self.bot.edit_message_reply_markup(
  382. **(
  383. {"inline_message_id": inline_message_id}
  384. if inline_message_id
  385. else {"chat_id": chat_id, "message_id": message_id}
  386. ),
  387. reply_markup=self.generate_markup(reply_markup),
  388. )
  389. except Exception:
  390. return False
  391. return True
  392. if media is None and text is None:
  393. logger.error("You must pass either `text` or `media` or `reply_markup`")
  394. return False
  395. if media is None:
  396. try:
  397. await self.bot.edit_message_text(
  398. text,
  399. **(
  400. {"inline_message_id": inline_message_id}
  401. if inline_message_id
  402. else {"chat_id": chat_id, "message_id": message_id}
  403. ),
  404. disable_web_page_preview=disable_web_page_preview,
  405. reply_markup=self.generate_markup(
  406. reply_markup
  407. if isinstance(reply_markup, list)
  408. else unit.get("buttons", [])
  409. ),
  410. )
  411. except MessageNotModified:
  412. if query:
  413. with contextlib.suppress(Exception):
  414. await query.answer()
  415. return False
  416. except RetryAfter as e:
  417. logger.info("Sleeping %ss on aiogram FloodWait...", e.timeout)
  418. await asyncio.sleep(e.timeout)
  419. return await self._edit_unit(**utils.get_kwargs())
  420. except MessageIdInvalid:
  421. with contextlib.suppress(Exception):
  422. await query.answer(
  423. "I should have edited some message, but it is deleted :("
  424. )
  425. return False
  426. except BadRequest as e:
  427. if "There is no text in the message to edit" not in str(e):
  428. raise
  429. try:
  430. await self.bot.edit_message_caption(
  431. caption=text,
  432. **(
  433. {"inline_message_id": inline_message_id}
  434. if inline_message_id
  435. else {"chat_id": chat_id, "message_id": message_id}
  436. ),
  437. reply_markup=self.generate_markup(
  438. reply_markup
  439. if isinstance(reply_markup, list)
  440. else unit.get("buttons", [])
  441. ),
  442. )
  443. except Exception:
  444. return False
  445. else:
  446. return True
  447. else:
  448. return True
  449. try:
  450. await self.bot.edit_message_media(
  451. **(
  452. {"inline_message_id": inline_message_id}
  453. if inline_message_id
  454. else {"chat_id": chat_id, "message_id": message_id}
  455. ),
  456. media=media,
  457. reply_markup=self.generate_markup(
  458. reply_markup
  459. if isinstance(reply_markup, list)
  460. else unit.get("buttons", [])
  461. ),
  462. )
  463. except RetryAfter as e:
  464. logger.info("Sleeping %ss on aiogram FloodWait...", e.timeout)
  465. await asyncio.sleep(e.timeout)
  466. return await self._edit_unit(**utils.get_kwargs())
  467. except MessageIdInvalid:
  468. with contextlib.suppress(Exception):
  469. await query.answer(
  470. "I should have edited some message, but it is deleted :("
  471. )
  472. return False
  473. else:
  474. return True
  475. async def _delete_unit_message(
  476. self,
  477. call: typing.Optional[CallbackQuery] = None,
  478. unit_id: typing.Optional[str] = None,
  479. chat_id: typing.Optional[int] = None,
  480. message_id: typing.Optional[int] = None,
  481. ) -> bool:
  482. """Params `self`, `unit_id` are for internal use only, do not try to pass them"""
  483. if getattr(getattr(call, "message", None), "chat", None):
  484. try:
  485. await self.bot.delete_message(
  486. chat_id=call.message.chat.id,
  487. message_id=call.message.message_id,
  488. )
  489. except Exception:
  490. return False
  491. return True
  492. if chat_id and message_id:
  493. try:
  494. await self.bot.delete_message(chat_id=chat_id, message_id=message_id)
  495. except Exception:
  496. return False
  497. return True
  498. if not unit_id and hasattr(call, "unit_id") and call.unit_id:
  499. unit_id = call.unit_id
  500. try:
  501. message_id, peer, _, _ = resolve_inline_message_id(
  502. self._units[unit_id]["inline_message_id"]
  503. )
  504. await self._client.delete_messages(peer, [message_id])
  505. await self._unload_unit(unit_id)
  506. except Exception:
  507. return False
  508. return True
  509. async def _unload_unit(self, unit_id: str) -> bool:
  510. """Params `self`, `unit_id` are for internal use only, do not try to pass them"""
  511. try:
  512. if "on_unload" in self._units[unit_id] and callable(
  513. self._units[unit_id]["on_unload"]
  514. ):
  515. self._units[unit_id]["on_unload"]()
  516. if unit_id in self._units:
  517. del self._units[unit_id]
  518. else:
  519. return False
  520. except Exception:
  521. return False
  522. return True
  523. def build_pagination(
  524. self,
  525. callback: typing.Callable[[int], typing.Awaitable[typing.Any]],
  526. total_pages: int,
  527. unit_id: typing.Optional[str] = None,
  528. current_page: typing.Optional[int] = None,
  529. ) -> typing.List[typing.List[typing.Dict[str, typing.Any]]]:
  530. # Based on https://github.com/pystorage/pykeyboard/blob/master/pykeyboard/inline_pagination_keyboard.py#L4
  531. if current_page is None:
  532. current_page = self._units[unit_id]["current_index"] + 1
  533. if total_pages <= 5:
  534. return [[
  535. (
  536. {"text": number, "args": (number - 1,), "callback": callback}
  537. if number != current_page
  538. else {
  539. "text": f"· {number} ·",
  540. "args": (number - 1,),
  541. "callback": callback,
  542. }
  543. )
  544. for number in range(1, total_pages + 1)
  545. ]]
  546. if current_page <= 3:
  547. return [[
  548. (
  549. {
  550. "text": f"· {number} ·",
  551. "args": (number - 1,),
  552. "callback": callback,
  553. }
  554. if number == current_page
  555. else (
  556. {
  557. "text": f"{number} ›",
  558. "args": (number - 1,),
  559. "callback": callback,
  560. }
  561. if number == 4
  562. else (
  563. {
  564. "text": f"{total_pages} »",
  565. "args": (total_pages - 1,),
  566. "callback": callback,
  567. }
  568. if number == 5
  569. else {
  570. "text": number,
  571. "args": (number - 1,),
  572. "callback": callback,
  573. }
  574. )
  575. )
  576. )
  577. for number in range(1, 6)
  578. ]]
  579. if current_page > total_pages - 3:
  580. return [
  581. [
  582. {"text": "« 1", "args": (0,), "callback": callback},
  583. {
  584. "text": f"‹ {total_pages - 3}",
  585. "args": (total_pages - 4,),
  586. "callback": callback,
  587. },
  588. ]
  589. + [
  590. (
  591. {
  592. "text": f"· {number} ·",
  593. "args": (number - 1,),
  594. "callback": callback,
  595. }
  596. if number == current_page
  597. else {
  598. "text": number,
  599. "args": (number - 1,),
  600. "callback": callback,
  601. }
  602. )
  603. for number in range(total_pages - 2, total_pages + 1)
  604. ]
  605. ]
  606. return [[
  607. {"text": "« 1", "args": (0,), "callback": callback},
  608. {
  609. "text": f"‹ {current_page - 1}",
  610. "args": (current_page - 2,),
  611. "callback": callback,
  612. },
  613. {
  614. "text": f"· {current_page} ·",
  615. "args": (current_page - 1,),
  616. "callback": callback,
  617. },
  618. {
  619. "text": f"{current_page + 1} ›",
  620. "args": (current_page,),
  621. "callback": callback,
  622. },
  623. {
  624. "text": f"{total_pages} »",
  625. "args": (total_pages - 1,),
  626. "callback": callback,
  627. },
  628. ]]
  629. def _validate_markup(
  630. self,
  631. buttons: typing.Optional[HikkaReplyMarkup],
  632. ) -> typing.List[typing.List[typing.Dict[str, typing.Any]]]:
  633. if buttons is None:
  634. buttons = []
  635. if not isinstance(buttons, (list, dict)):
  636. logger.error(
  637. "Reply markup ommited because passed type is not valid (%s)",
  638. type(buttons),
  639. )
  640. return None
  641. buttons = self._normalize_markup(buttons)
  642. if not all(all(isinstance(button, dict) for button in row) for row in buttons):
  643. logger.error(
  644. "Reply markup ommited because passed invalid type for one of the"
  645. " buttons"
  646. )
  647. return None
  648. if not all(
  649. all(
  650. "url" in button
  651. or "callback" in button
  652. or "input" in button
  653. or "data" in button
  654. or "action" in button
  655. for button in row
  656. )
  657. for row in buttons
  658. ):
  659. logger.error(
  660. "Invalid button specified. "
  661. "Button must contain one of the following fields:\n"
  662. " - `url`\n"
  663. " - `callback`\n"
  664. " - `input`\n"
  665. " - `data`\n"
  666. " - `action`"
  667. )
  668. return None
  669. return buttons