list.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import asyncio
  7. import contextlib
  8. import copy
  9. import functools
  10. import logging
  11. import time
  12. import traceback
  13. import typing
  14. from aiogram.types import (
  15. CallbackQuery,
  16. InlineKeyboardMarkup,
  17. InlineQuery,
  18. InlineQueryResultArticle,
  19. InputTextMessageContent,
  20. )
  21. from aiogram.utils.exceptions import RetryAfter
  22. from hikkatl.errors.rpcerrorlist import ChatSendInlineForbiddenError
  23. from hikkatl.extensions.html import CUSTOM_EMOJIS
  24. from hikkatl.tl.types import Message
  25. from .. import main, utils
  26. from ..types import HikkaReplyMarkup
  27. from .types import InlineMessage, InlineUnit
  28. logger = logging.getLogger(__name__)
  29. class List(InlineUnit):
  30. async def list(
  31. self,
  32. message: typing.Union[Message, int],
  33. strings: typing.List[str],
  34. *,
  35. force_me: bool = False,
  36. always_allow: typing.Optional[typing.List[int]] = None,
  37. manual_security: bool = False,
  38. disable_security: bool = False,
  39. ttl: typing.Union[int, bool] = False,
  40. on_unload: typing.Optional[typing.Callable[[], typing.Any]] = None,
  41. silent: bool = False,
  42. custom_buttons: typing.Optional[HikkaReplyMarkup] = None,
  43. ) -> typing.Union[bool, InlineMessage]:
  44. """
  45. Send inline list to chat
  46. :param message: Where to send list. Can be either `Message` or `int`
  47. :param strings: List of strings, which should become inline list
  48. :param force_me: Either this list buttons must be pressed only by owner scope or no
  49. :param always_allow: Users, that are allowed to press buttons in addition to previous rules
  50. :param ttl: Time, when the list is going to be unloaded. Unload means, that the list
  51. will become unusable. Pay attention, that ttl can't
  52. be bigger, than default one (1 day) and must be either `int` or `False`
  53. :param on_unload: Callback, called when list is unloaded and/or closed. You can clean up trash
  54. or perform another needed action
  55. :param manual_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
  56. If you want to avoid this, pass `manual_security=True`
  57. :param disable_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
  58. If you want to disable all security checks on this list in particular, pass `disable_security=True`
  59. :param silent: Whether the list must be sent silently (w/o "Opening list..." message)
  60. :param custom_buttons: Custom buttons to add above native ones
  61. :return: If list is sent, returns :obj:`InlineMessage`, otherwise returns `False`
  62. """
  63. with contextlib.suppress(AttributeError):
  64. _hikka_client_id_logging_tag = copy.copy(self._client.tg_id) # noqa: F841
  65. custom_buttons = self._validate_markup(custom_buttons)
  66. if not isinstance(manual_security, bool):
  67. logger.error(
  68. "Invalid type for `manual_security`. Expected `bool`, got `%s`",
  69. type(manual_security),
  70. )
  71. return False
  72. if not isinstance(silent, bool):
  73. logger.error(
  74. "Invalid type for `silent`. Expected `bool`, got `%s`",
  75. type(silent),
  76. )
  77. return False
  78. if not isinstance(disable_security, bool):
  79. logger.error(
  80. "Invalid type for `disable_security`. Expected `bool`, got `%s`",
  81. type(disable_security),
  82. )
  83. return False
  84. if not isinstance(message, (Message, int)):
  85. logger.error(
  86. "Invalid type for `message`. Expected `Message` or `int`, got `%s`",
  87. type(message),
  88. )
  89. return False
  90. if not isinstance(force_me, bool):
  91. logger.error(
  92. "Invalid type for `force_me`. Expected `bool`, got `%s`",
  93. type(force_me),
  94. )
  95. return False
  96. if not isinstance(strings, list) or not strings:
  97. logger.error(
  98. (
  99. "Invalid type for `strings`. Expected `list` with at least one"
  100. " element, got `%s`"
  101. ),
  102. type(strings),
  103. )
  104. return False
  105. if len(strings) > 50:
  106. logger.error("Too much pages for `strings` (%s)", len(strings))
  107. return False
  108. if always_allow and not isinstance(always_allow, list):
  109. logger.error(
  110. "Invalid type for `always_allow`. Expected `list`, got `%s`",
  111. type(always_allow),
  112. )
  113. return False
  114. if not always_allow:
  115. always_allow = []
  116. if not isinstance(ttl, int) and ttl:
  117. logger.error(
  118. "Invalid type for `ttl`. Expected `int` or `False`, got `%s`",
  119. type(ttl),
  120. )
  121. return False
  122. unit_id = utils.rand(16)
  123. perms_map = None if manual_security else self._find_caller_sec_map()
  124. self._units[unit_id] = {
  125. "type": "list",
  126. "caller": message,
  127. "chat": None,
  128. "message_id": None,
  129. "top_msg_id": utils.get_topic(message),
  130. "uid": unit_id,
  131. "current_index": 0,
  132. "strings": strings,
  133. "future": asyncio.Event(),
  134. **({"ttl": round(time.time()) + ttl} if ttl else {}),
  135. **({"force_me": force_me} if force_me else {}),
  136. **({"disable_security": disable_security} if disable_security else {}),
  137. **({"on_unload": on_unload} if callable(on_unload) else {}),
  138. **({"always_allow": always_allow} if always_allow else {}),
  139. **({"perms_map": perms_map} if perms_map else {}),
  140. **({"message": message} if isinstance(message, Message) else {}),
  141. **({"custom_buttons": custom_buttons} if custom_buttons else {}),
  142. }
  143. btn_call_data = utils.rand(10)
  144. self._custom_map[btn_call_data] = {
  145. "handler": functools.partial(
  146. self._list_page,
  147. unit_id=unit_id,
  148. ),
  149. **(
  150. {"ttl": self._units[unit_id]["ttl"]}
  151. if "ttl" in self._units[unit_id]
  152. else {}
  153. ),
  154. **({"always_allow": always_allow} if always_allow else {}),
  155. **({"force_me": force_me} if force_me else {}),
  156. **({"disable_security": disable_security} if disable_security else {}),
  157. **({"perms_map": perms_map} if perms_map else {}),
  158. **({"message": message} if isinstance(message, Message) else {}),
  159. }
  160. if isinstance(message, Message) and not silent:
  161. try:
  162. status_message = await (
  163. message.edit if message.out else message.respond
  164. )(
  165. (
  166. utils.get_platform_emoji()
  167. if self._client.hikka_me.premium and CUSTOM_EMOJIS
  168. else "🌘"
  169. )
  170. + self.translator.getkey("inline.opening_list"),
  171. **({"reply_to": utils.get_topic(message)} if message.out else {}),
  172. )
  173. except Exception:
  174. status_message = None
  175. else:
  176. status_message = None
  177. async def answer(msg: str):
  178. nonlocal message
  179. if isinstance(message, Message):
  180. await (message.edit if message.out else message.respond)(
  181. msg,
  182. **({} if message.out else {"reply_to": utils.get_topic(message)}),
  183. )
  184. else:
  185. await self._client.send_message(message, msg)
  186. try:
  187. m = await self._invoke_unit(unit_id, message)
  188. except ChatSendInlineForbiddenError:
  189. await answer(self.translator.getkey("inline.inline403"))
  190. except Exception:
  191. logger.exception("Can't send list")
  192. del self._units[unit_id]
  193. await answer(
  194. self.translator.getkey("inline.invoke_failed_logs").format(
  195. utils.escape_html(
  196. "\n".join(traceback.format_exc().splitlines()[1:])
  197. )
  198. )
  199. if self._db.get(main.__name__, "inlinelogs", True)
  200. else self.translator.getkey("inline.invoke_failed")
  201. )
  202. return False
  203. await self._units[unit_id]["future"].wait()
  204. del self._units[unit_id]["future"]
  205. self._units[unit_id]["chat"] = utils.get_chat_id(m)
  206. self._units[unit_id]["message_id"] = m.id
  207. if isinstance(message, Message) and message.out:
  208. await message.delete()
  209. if status_message and not message.out:
  210. await status_message.delete()
  211. return InlineMessage(self, unit_id, self._units[unit_id]["inline_message_id"])
  212. async def _list_page(
  213. self,
  214. call: CallbackQuery,
  215. page: typing.Union[int, str],
  216. unit_id: str = None,
  217. ):
  218. if page == "close":
  219. await self._delete_unit_message(call, unit_id=unit_id)
  220. return
  221. if self._units[unit_id]["current_index"] < 0 or page >= len(
  222. self._units[unit_id]["strings"]
  223. ):
  224. await call.answer("Can't go to this page", show_alert=True)
  225. return
  226. self._units[unit_id]["current_index"] = page
  227. try:
  228. await self.bot.edit_message_text(
  229. inline_message_id=call.inline_message_id,
  230. text=self.sanitise_text(
  231. self._units[unit_id]["strings"][
  232. self._units[unit_id]["current_index"]
  233. ]
  234. ),
  235. reply_markup=self._list_markup(unit_id),
  236. )
  237. await call.answer()
  238. except RetryAfter as e:
  239. await call.answer(
  240. f"Got FloodWait. Wait for {e.timeout} seconds",
  241. show_alert=True,
  242. )
  243. except Exception:
  244. logger.exception("Exception while trying to edit list")
  245. await call.answer("Error occurred", show_alert=True)
  246. return
  247. def _list_markup(self, unit_id: str) -> InlineKeyboardMarkup:
  248. """Generates aiogram markup for `list`"""
  249. callback = functools.partial(self._list_page, unit_id=unit_id)
  250. return self.generate_markup(
  251. self._units[unit_id].get("custom_buttons", [])
  252. + self.build_pagination(
  253. callback=callback,
  254. total_pages=len(self._units[unit_id]["strings"]),
  255. unit_id=unit_id,
  256. )
  257. + [[{"text": "🔻 Close", "callback": callback, "args": ("close",)}]],
  258. )
  259. async def _list_inline_handler(self, inline_query: InlineQuery):
  260. for unit in self._units.copy().values():
  261. if (
  262. inline_query.from_user.id == self._me
  263. and inline_query.query == unit["uid"]
  264. and unit["type"] == "list"
  265. ):
  266. try:
  267. await inline_query.answer(
  268. [
  269. InlineQueryResultArticle(
  270. id=utils.rand(20),
  271. title="Hikka",
  272. input_message_content=InputTextMessageContent(
  273. self.sanitise_text(unit["strings"][0]),
  274. "HTML",
  275. disable_web_page_preview=True,
  276. ),
  277. reply_markup=self._list_markup(inline_query.query),
  278. )
  279. ],
  280. cache_time=60,
  281. )
  282. except Exception as e:
  283. if unit["uid"] in self._error_events:
  284. self._error_events[unit["uid"]].set()
  285. self._error_events[unit["uid"]] = e