core.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """Inline buttons, galleries and other Telegram-Bot-API stuff"""
  2. # ©️ Dan Gazizullin, 2021-2022
  3. # This file is a part of Hikka Userbot
  4. # 🌐 https://github.com/hikariatama/Hikka
  5. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  6. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  7. import asyncio
  8. import logging
  9. import time
  10. from aiogram import Bot, Dispatcher
  11. from aiogram.types import ParseMode
  12. from aiogram.utils.exceptions import TerminatedByOtherGetUpdates, Unauthorized
  13. from telethon.errors.rpcerrorlist import InputUserDeactivatedError, YouBlockedUserError
  14. from telethon.tl.functions.contacts import UnblockRequest
  15. from telethon.utils import get_display_name
  16. from ..database import Database
  17. from ..tl_cache import CustomTelegramClient
  18. from .bot_pm import BotPM
  19. from .events import Events
  20. from .form import Form
  21. from .gallery import Gallery
  22. from .list import List
  23. from .query_gallery import QueryGallery
  24. from .token_obtainment import TokenObtainment
  25. from .utils import Utils
  26. logger = logging.getLogger(__name__)
  27. class InlineManager(
  28. Utils,
  29. Events,
  30. TokenObtainment,
  31. Form,
  32. Gallery,
  33. QueryGallery,
  34. List,
  35. BotPM,
  36. ):
  37. def __init__(
  38. self,
  39. client: CustomTelegramClient,
  40. db: Database,
  41. allmodules: "Modules", # type: ignore
  42. ):
  43. """Initialize InlineManager to create forms"""
  44. self._client = client
  45. self._db = db
  46. self._allmodules = allmodules
  47. self._units = {}
  48. self._custom_map = {}
  49. self.fsm = {}
  50. self._web_auth_tokens = []
  51. self._markup_ttl = 60 * 60 * 24
  52. self.init_complete = False
  53. self._token = db.get("hikka.inline", "bot_token", False)
  54. async def _cleaner(self):
  55. """Cleans outdated inline units"""
  56. while True:
  57. for unit_id, unit in self._units.copy().items():
  58. if (unit.get("ttl") or (time.time() + self._markup_ttl)) < time.time():
  59. del self._units[unit_id]
  60. await asyncio.sleep(5)
  61. async def register_manager(
  62. self,
  63. after_break: bool = False,
  64. ignore_token_checks: bool = False,
  65. ):
  66. # Get info about user to use it in this class
  67. self._me = self._client.tg_id
  68. self._name = get_display_name(self._client.hikka_me)
  69. if not ignore_token_checks:
  70. # Assert that token is set to valid, and if not,
  71. # set `init_complete` to `False` and return
  72. is_token_asserted = await self._assert_token()
  73. if not is_token_asserted:
  74. self.init_complete = False
  75. return
  76. # We successfully asserted token, so set `init_complete` to `True`
  77. self.init_complete = True
  78. # Create bot instance and dispatcher
  79. self.bot = Bot(token=self._token, parse_mode=ParseMode.HTML)
  80. Bot.set_current(self.bot)
  81. self._bot = self.bot # This is a temporary alias so the
  82. # developers can adapt their code
  83. self._dp = Dispatcher(self.bot)
  84. # Get bot username to call inline queries
  85. try:
  86. bot_me = await self.bot.get_me()
  87. self.bot_username = bot_me.username
  88. self.bot_id = bot_me.id
  89. except Unauthorized:
  90. logger.critical("Token expired, revoking...")
  91. return await self._dp_revoke_token(False)
  92. # Start the bot in case it can send you messages
  93. try:
  94. m = await self._client.send_message(self.bot_username, "/start hikka init")
  95. except (InputUserDeactivatedError, ValueError):
  96. self._db.set("hikka.inline", "bot_token", None)
  97. self._token = False
  98. if not after_break:
  99. return await self.register_manager(True)
  100. self.init_complete = False
  101. return False
  102. except YouBlockedUserError:
  103. await self._client(UnblockRequest(id=self.bot_username))
  104. try:
  105. m = await self._client.send_message(
  106. self.bot_username, "/start hikka init"
  107. )
  108. except Exception:
  109. logger.critical("Can't unblock users bot", exc_info=True)
  110. return False
  111. except Exception:
  112. self.init_complete = False
  113. logger.critical("Initialization of inline manager failed!", exc_info=True)
  114. return False
  115. await self._client.delete_messages(self.bot_username, m)
  116. # Register required event handlers inside aiogram
  117. self._dp.register_inline_handler(
  118. self._inline_handler,
  119. lambda _: True,
  120. )
  121. self._dp.register_callback_query_handler(
  122. self._callback_query_handler,
  123. lambda _: True,
  124. )
  125. self._dp.register_chosen_inline_handler(
  126. self._chosen_inline_handler,
  127. lambda _: True,
  128. )
  129. self._dp.register_message_handler(
  130. self._message_handler,
  131. lambda *_: True,
  132. content_types=["any"],
  133. )
  134. old = self.bot.get_updates
  135. revoke = self._dp_revoke_token
  136. async def new(*args, **kwargs):
  137. nonlocal revoke, old
  138. try:
  139. return await old(*args, **kwargs)
  140. except TerminatedByOtherGetUpdates:
  141. await revoke()
  142. except Unauthorized:
  143. logger.critical("Got Unauthorized")
  144. await self._stop()
  145. self.bot.get_updates = new
  146. # Start polling as the separate task, just in case we will need
  147. # to force stop this coro. It should be cancelled only by `stop`
  148. # because it stops the bot from getting updates
  149. self._task = asyncio.ensure_future(self._dp.start_polling())
  150. self._cleaner_task = asyncio.ensure_future(self._cleaner())
  151. async def _stop(self):
  152. self._task.cancel()
  153. self._dp.stop_polling()
  154. self._cleaner_task.cancel()
  155. def pop_web_auth_token(self, token) -> bool:
  156. """Check if web confirmation button was pressed"""
  157. if token not in self._web_auth_tokens:
  158. return False
  159. self._web_auth_tokens.remove(token)
  160. return True
  161. if __name__ == "__main__":
  162. raise Exception("This file must be called as a module")