core.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """Inline buttons, galleries and other Telegram-Bot-API stuff"""
  2. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  3. # █▀█ █ █ █ █▀█ █▀▄ █
  4. # © Copyright 2022
  5. # https://t.me/hikariatama
  6. #
  7. # 🔒 Licensed under the GNU AGPLv3
  8. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  9. import asyncio
  10. import logging
  11. import time
  12. from aiogram import Bot, Dispatcher
  13. from aiogram.types import ParseMode
  14. from aiogram.utils.exceptions import TerminatedByOtherGetUpdates, Unauthorized
  15. from telethon.errors.rpcerrorlist import InputUserDeactivatedError, YouBlockedUserError
  16. from telethon.tl.functions.contacts import UnblockRequest
  17. from telethon.utils import get_display_name
  18. from .bot_pm import BotPM
  19. from .events import Events
  20. from .form import Form
  21. from .gallery import Gallery
  22. from .list import List
  23. from .query_gallery import QueryGallery
  24. from .token_obtainment import TokenObtainment
  25. from .utils import Utils
  26. from ..database import Database
  27. from ..tl_cache import CustomTelegramClient
  28. logger = logging.getLogger(__name__)
  29. class InlineManager(
  30. Utils,
  31. Events,
  32. TokenObtainment,
  33. Form,
  34. Gallery,
  35. QueryGallery,
  36. List,
  37. BotPM,
  38. ):
  39. def __init__(
  40. self,
  41. client: CustomTelegramClient,
  42. db: Database,
  43. allmodules: "Modules", # type: ignore
  44. ):
  45. """Initialize InlineManager to create forms"""
  46. self._client = client
  47. self._db = db
  48. self._allmodules = allmodules
  49. self._units = {}
  50. self._custom_map = {}
  51. self.fsm = {}
  52. self._web_auth_tokens = []
  53. self._markup_ttl = 60 * 60 * 24
  54. self.init_complete = False
  55. self._token = db.get("hikka.inline", "bot_token", False)
  56. async def _cleaner(self):
  57. """Cleans outdated inline units"""
  58. while True:
  59. for unit_id, unit in self._units.copy().items():
  60. if (unit.get("ttl") or (time.time() + self._markup_ttl)) < time.time():
  61. del self._units[unit_id]
  62. await asyncio.sleep(5)
  63. async def _register_manager(
  64. self,
  65. after_break: bool = False,
  66. ignore_token_checks: bool = False,
  67. ):
  68. # Get info about user to use it in this class
  69. self._me = self._client.tg_id
  70. self._name = get_display_name(self._client.hikka_me)
  71. if not ignore_token_checks:
  72. # Assert that token is set to valid, and if not,
  73. # set `init_complete` to `False` and return
  74. is_token_asserted = await self._assert_token()
  75. if not is_token_asserted:
  76. self.init_complete = False
  77. return
  78. # We successfully asserted token, so set `init_complete` to `True`
  79. self.init_complete = True
  80. # Create bot instance and dispatcher
  81. self.bot = Bot(token=self._token, parse_mode=ParseMode.HTML)
  82. Bot.set_current(self.bot)
  83. self._bot = self.bot # This is a temporary alias so the
  84. # developers can adapt their code
  85. self._dp = Dispatcher(self.bot)
  86. # Get bot username to call inline queries
  87. try:
  88. bot_me = await self.bot.get_me()
  89. self.bot_username = bot_me.username
  90. self.bot_id = bot_me.id
  91. except Unauthorized:
  92. logger.critical("Token expired, revoking...")
  93. return await self._dp_revoke_token(False)
  94. # Start the bot in case it can send you messages
  95. try:
  96. m = await self._client.send_message(self.bot_username, "/start hikka init")
  97. except (InputUserDeactivatedError, ValueError):
  98. self._db.set("hikka.inline", "bot_token", None)
  99. self._token = False
  100. if not after_break:
  101. return await self._register_manager(True)
  102. self.init_complete = False
  103. return False
  104. except YouBlockedUserError:
  105. await self._client(UnblockRequest(id=self.bot_username))
  106. try:
  107. m = await self._client.send_message(
  108. self.bot_username, "/start hikka init"
  109. )
  110. except Exception:
  111. logger.critical("Can't unblock users bot", exc_info=True)
  112. return False
  113. except Exception:
  114. self.init_complete = False
  115. logger.critical("Initialization of inline manager failed!", exc_info=True)
  116. return False
  117. await self._client.delete_messages(self.bot_username, m)
  118. # Register required event handlers inside aiogram
  119. self._dp.register_inline_handler(
  120. self._inline_handler,
  121. lambda _: True,
  122. )
  123. self._dp.register_callback_query_handler(
  124. self._callback_query_handler,
  125. lambda _: True,
  126. )
  127. self._dp.register_chosen_inline_handler(
  128. self._chosen_inline_handler,
  129. lambda _: True,
  130. )
  131. self._dp.register_message_handler(
  132. self._message_handler,
  133. lambda *_: True,
  134. content_types=["any"],
  135. )
  136. old = self.bot.get_updates
  137. revoke = self._dp_revoke_token
  138. async def new(*args, **kwargs):
  139. nonlocal revoke, old
  140. try:
  141. return await old(*args, **kwargs)
  142. except TerminatedByOtherGetUpdates:
  143. await revoke()
  144. except Unauthorized:
  145. logger.critical("Got Unauthorized")
  146. await self._stop()
  147. self.bot.get_updates = new
  148. # Start polling as the separate task, just in case we will need
  149. # to force stop this coro. It should be cancelled only by `stop`
  150. # because it stops the bot from getting updates
  151. self._task = asyncio.ensure_future(self._dp.start_polling())
  152. self._cleaner_task = asyncio.ensure_future(self._cleaner())
  153. async def _stop(self):
  154. self._task.cancel()
  155. self._dp.stop_polling()
  156. self._cleaner_task.cancel()
  157. def pop_web_auth_token(self, token) -> bool:
  158. """Check if web confirmation button was pressed"""
  159. if token not in self._web_auth_tokens:
  160. return False
  161. self._web_auth_tokens.remove(token)
  162. return True
  163. if __name__ == "__main__":
  164. raise Exception("This file must be called as a module")