list.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import asyncio
  9. import contextlib
  10. import copy
  11. import functools
  12. import logging
  13. import time
  14. import traceback
  15. from typing import List as _List
  16. from typing import Optional, Union
  17. from aiogram.types import (
  18. CallbackQuery,
  19. InlineKeyboardMarkup,
  20. InlineQuery,
  21. InlineQueryResultArticle,
  22. InputTextMessageContent,
  23. )
  24. from aiogram.utils.exceptions import RetryAfter
  25. from telethon.tl.types import Message
  26. from telethon.errors.rpcerrorlist import ChatSendInlineForbiddenError
  27. from .. import utils, main
  28. from .types import InlineMessage, InlineUnit
  29. logger = logging.getLogger(__name__)
  30. class List(InlineUnit):
  31. async def list(
  32. self,
  33. message: Union[Message, int],
  34. strings: _List[str],
  35. *,
  36. force_me: Optional[bool] = False,
  37. always_allow: Optional[list] = None,
  38. manual_security: Optional[bool] = False,
  39. disable_security: Optional[bool] = False,
  40. ttl: Optional[Union[int, bool]] = False,
  41. on_unload: Optional[callable] = None,
  42. silent: Optional[bool] = False,
  43. custom_buttons: Optional[Union[_List[_List[dict]], _List[dict], dict]] = None,
  44. ) -> Union[bool, InlineMessage]:
  45. """
  46. Send inline list to chat
  47. :param message: Where to send list. Can be either `Message` or `int`
  48. :param strings: List of strings, which should become inline list
  49. :param force_me: Either this list buttons must be pressed only by owner scope or no
  50. :param always_allow: Users, that are allowed to press buttons in addition to previous rules
  51. :param ttl: Time, when the list is going to be unloaded. Unload means, that the list
  52. will become unusable. Pay attention, that ttl can't
  53. be bigger, than default one (1 day) and must be either `int` or `False`
  54. :param on_unload: Callback, called when list is unloaded and/or closed. You can clean up trash
  55. or perform another needed action
  56. :param manual_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
  57. If you want to avoid this, pass `manual_security=True`
  58. :param disable_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
  59. If you want to disable all security checks on this list in particular, pass `disable_security=True`
  60. :param silent: Whether the list must be sent silently (w/o "Loading inline list..." message)
  61. :param custom_buttons: Custom buttons to add above native ones
  62. :return: If list is sent, returns :obj:`InlineMessage`, otherwise returns `False`
  63. """
  64. with contextlib.suppress(AttributeError):
  65. _hikka_client_id_logging_tag = copy.copy(self._client.tg_id)
  66. custom_buttons = self._validate_markup(custom_buttons)
  67. if not isinstance(manual_security, bool):
  68. logger.error("Invalid type for `manual_security`")
  69. return False
  70. if not isinstance(silent, bool):
  71. logger.error("Invalid type for `silent`")
  72. return False
  73. if not isinstance(disable_security, bool):
  74. logger.error("Invalid type for `disable_security`")
  75. return False
  76. if not isinstance(message, (Message, int)):
  77. logger.error("Invalid type for `message`")
  78. return False
  79. if not isinstance(force_me, bool):
  80. logger.error("Invalid type for `force_me`")
  81. return False
  82. if not isinstance(strings, list) or not strings:
  83. logger.error("Invalid type for `strings`")
  84. return False
  85. if len(strings) > 50:
  86. logger.error(f"Too much pages for `strings` ({len(strings)})")
  87. return False
  88. if always_allow and not isinstance(always_allow, list):
  89. logger.error("Invalid type for `always_allow`")
  90. return False
  91. if not always_allow:
  92. always_allow = []
  93. if not isinstance(ttl, int) and ttl:
  94. logger.error("Invalid type for `ttl`")
  95. return False
  96. unit_id = utils.rand(16)
  97. perms_map = None if manual_security else self._find_caller_sec_map()
  98. self._units[unit_id] = {
  99. "type": "list",
  100. "chat": None,
  101. "message_id": None,
  102. "uid": unit_id,
  103. "current_index": 0,
  104. "strings": strings,
  105. "future": asyncio.Event(),
  106. **({"ttl": round(time.time()) + ttl} if ttl else {}),
  107. **({"force_me": force_me} if force_me else {}),
  108. **({"disable_security": disable_security} if disable_security else {}),
  109. **({"on_unload": on_unload} if callable(on_unload) else {}),
  110. **({"always_allow": always_allow} if always_allow else {}),
  111. **({"perms_map": perms_map} if perms_map else {}),
  112. **({"message": message} if isinstance(message, Message) else {}),
  113. **({"custom_buttons": custom_buttons} if custom_buttons else {}),
  114. }
  115. btn_call_data = utils.rand(10)
  116. self._custom_map[btn_call_data] = {
  117. "handler": asyncio.coroutine(
  118. functools.partial(
  119. self._list_page,
  120. unit_id=unit_id,
  121. )
  122. ),
  123. **(
  124. {"ttl": self._units[unit_id]["ttl"]}
  125. if "ttl" in self._units[unit_id]
  126. else {}
  127. ),
  128. **({"always_allow": always_allow} if always_allow else {}),
  129. **({"force_me": force_me} if force_me else {}),
  130. **({"disable_security": disable_security} if disable_security else {}),
  131. **({"perms_map": perms_map} if perms_map else {}),
  132. **({"message": message} if isinstance(message, Message) else {}),
  133. }
  134. if isinstance(message, Message) and not silent:
  135. try:
  136. status_message = await (
  137. message.edit if message.out else message.respond
  138. )("🌘 <b>Loading inline list...</b>")
  139. except Exception:
  140. status_message = None
  141. else:
  142. status_message = None
  143. async def answer(msg: str):
  144. nonlocal message
  145. if isinstance(message, Message):
  146. await (message.edit if message.out else message.respond)(msg)
  147. else:
  148. await self._client.send_message(message, msg)
  149. try:
  150. q = await self._client.inline_query(self.bot_username, unit_id)
  151. m = await q[0].click(
  152. utils.get_chat_id(message) if isinstance(message, Message) else message,
  153. reply_to=message.reply_to_msg_id
  154. if isinstance(message, Message)
  155. else None,
  156. )
  157. except ChatSendInlineForbiddenError:
  158. await answer("🚫 <b>You can't send inline units in this chat</b>")
  159. except Exception:
  160. logger.exception("Can't send list")
  161. if not self._db.get(main.__name__, "inlinelogs", True):
  162. msg = "<b>🚫 List invoke failed! More info in logs</b>"
  163. else:
  164. exc = traceback.format_exc()
  165. # Remove `Traceback (most recent call last):`
  166. exc = "\n".join(exc.splitlines()[1:])
  167. msg = (
  168. "<b>🚫 List invoke failed!</b>\n\n"
  169. f"<b>🧾 Logs:</b>\n<code>{utils.escape_html(exc)}</code>"
  170. )
  171. del self._units[unit_id]
  172. await answer(msg)
  173. return False
  174. await self._units[unit_id]["future"].wait()
  175. del self._units[unit_id]["future"]
  176. self._units[unit_id]["chat"] = utils.get_chat_id(m)
  177. self._units[unit_id]["message_id"] = m.id
  178. if isinstance(message, Message) and message.out:
  179. await message.delete()
  180. if status_message and not message.out:
  181. await status_message.delete()
  182. return InlineMessage(self, unit_id, self._units[unit_id]["inline_message_id"])
  183. async def _list_page(
  184. self,
  185. call: CallbackQuery,
  186. page: Union[int, str],
  187. unit_id: str = None,
  188. ):
  189. if page == "close":
  190. await self._delete_unit_message(call, unit_id=unit_id)
  191. return
  192. if self._units[unit_id]["current_index"] < 0 or page >= len(
  193. self._units[unit_id]["strings"]
  194. ):
  195. await call.answer("Can't go to this page", show_alert=True)
  196. return
  197. self._units[unit_id]["current_index"] = page
  198. try:
  199. await self.bot.edit_message_text(
  200. inline_message_id=call.inline_message_id,
  201. text=self.sanitise_text(
  202. self._units[unit_id]["strings"][
  203. self._units[unit_id]["current_index"]
  204. ]
  205. ),
  206. reply_markup=self._list_markup(unit_id),
  207. )
  208. await call.answer()
  209. except RetryAfter as e:
  210. await call.answer(
  211. f"Got FloodWait. Wait for {e.timeout} seconds",
  212. show_alert=True,
  213. )
  214. except Exception:
  215. logger.exception("Exception while trying to edit list")
  216. await call.answer("Error occurred", show_alert=True)
  217. return
  218. def _list_markup(self, unit_id: str) -> InlineKeyboardMarkup:
  219. """Generates aiogram markup for `list`"""
  220. callback = functools.partial(self._list_page, unit_id=unit_id)
  221. return self.generate_markup(
  222. self._units[unit_id].get("custom_buttons", [])
  223. + self.build_pagination(
  224. callback=callback,
  225. total_pages=len(self._units[unit_id]["strings"]),
  226. unit_id=unit_id,
  227. )
  228. + [[{"text": "🔻 Close", "callback": callback, "args": ("close",)}]],
  229. )
  230. async def _list_inline_handler(self, inline_query: InlineQuery):
  231. for unit in self._units.copy().values():
  232. if (
  233. inline_query.from_user.id == self._me
  234. and inline_query.query == unit["uid"]
  235. and unit["type"] == "list"
  236. ):
  237. await inline_query.answer(
  238. [
  239. InlineQueryResultArticle(
  240. id=utils.rand(20),
  241. title="Hikka",
  242. input_message_content=InputTextMessageContent(
  243. self.sanitise_text(unit["strings"][0]),
  244. "HTML",
  245. disable_web_page_preview=True,
  246. ),
  247. reply_markup=self._list_markup(inline_query.query),
  248. )
  249. ],
  250. cache_time=60,
  251. )