core.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """Inline buttons, galleries and other Telegram-Bot-API stuff"""
  2. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  3. # █▀█ █ █ █ █▀█ █▀▄ █
  4. # © Copyright 2022
  5. # https://t.me/hikariatama
  6. #
  7. # 🔒 Licensed under the GNU AGPLv3
  8. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  9. import asyncio
  10. import logging
  11. import time
  12. from aiogram import Bot, Dispatcher
  13. from aiogram.types import ParseMode
  14. from aiogram.utils.exceptions import TerminatedByOtherGetUpdates, Unauthorized
  15. from telethon.errors.rpcerrorlist import InputUserDeactivatedError, YouBlockedUserError
  16. from telethon.tl.functions.contacts import UnblockRequest
  17. from telethon.utils import get_display_name
  18. from ..database import Database
  19. from .bot_pm import BotPM
  20. from .events import Events
  21. from .form import Form
  22. from .gallery import Gallery
  23. from .list import List
  24. from .query_gallery import QueryGallery
  25. from .token_obtainment import TokenObtainment
  26. from .utils import Utils
  27. logger = logging.getLogger(__name__)
  28. class InlineManager(
  29. Utils,
  30. Events,
  31. TokenObtainment,
  32. Form,
  33. Gallery,
  34. QueryGallery,
  35. List,
  36. BotPM,
  37. ):
  38. _units = {}
  39. _custom_map = {}
  40. fsm = {}
  41. _web_auth_tokens = []
  42. _markup_ttl = 60 * 60 * 24
  43. init_complete = False
  44. def __init__(
  45. self,
  46. client: "TelegramClient", # type: ignore
  47. db: Database,
  48. allmodules: "Modules", # type: ignore
  49. ):
  50. """Initialize InlineManager to create forms"""
  51. self._client = client
  52. self._db = db
  53. self._allmodules = allmodules
  54. self._token = db.get("hikka.inline", "bot_token", False)
  55. async def _cleaner(self):
  56. """Cleans outdated inline units"""
  57. while True:
  58. for unit_id, unit in self._units.copy().items():
  59. if (unit.get("ttl") or (time.time() + self._markup_ttl)) < time.time():
  60. del self._units[unit_id]
  61. await asyncio.sleep(5)
  62. async def _register_manager(
  63. self,
  64. after_break: bool = False,
  65. ignore_token_checks: bool = False,
  66. ):
  67. # Get info about user to use it in this class
  68. me = await self._client.get_me()
  69. self._me = me.id
  70. self._name = get_display_name(me)
  71. if not ignore_token_checks:
  72. # Assert that token is set to valid, and if not,
  73. # set `init_complete` to `False` and return
  74. is_token_asserted = await self._assert_token()
  75. if not is_token_asserted:
  76. self.init_complete = False
  77. return
  78. # We successfully asserted token, so set `init_complete` to `True`
  79. self.init_complete = True
  80. # Create bot instance and dispatcher
  81. self.bot = Bot(token=self._token, parse_mode=ParseMode.HTML)
  82. self._bot = self.bot # This is a temporary alias so the
  83. # developers can adapt their code
  84. self._dp = Dispatcher(self.bot)
  85. # Get bot username to call inline queries
  86. try:
  87. bot_me = await self.bot.get_me()
  88. self.bot_username = bot_me.username
  89. self.bot_id = bot_me.id
  90. except Unauthorized:
  91. logger.critical("Token expired, revoking...")
  92. return await self._dp_revoke_token(False)
  93. # Start the bot in case it can send you messages
  94. try:
  95. m = await self._client.send_message(self.bot_username, "/start")
  96. except (InputUserDeactivatedError, ValueError):
  97. self._db.set("hikka.inline", "bot_token", None)
  98. self._token = False
  99. if not after_break:
  100. return await self._register_manager(True)
  101. self.init_complete = False
  102. return False
  103. except YouBlockedUserError:
  104. await self._client(UnblockRequest(id=self.bot_username))
  105. try:
  106. m = await self._client.send_message(self.bot_username, "/start")
  107. except Exception:
  108. logger.critical("Can't unblock users bot", exc_info=True)
  109. return False
  110. except Exception:
  111. self.init_complete = False
  112. logger.critical("Initialization of inline manager failed!", exc_info=True)
  113. return False
  114. await self._client.delete_messages(self.bot_username, m)
  115. # Register required event handlers inside aiogram
  116. self._dp.register_inline_handler(
  117. self._inline_handler,
  118. lambda _: True,
  119. )
  120. self._dp.register_callback_query_handler(
  121. self._callback_query_handler,
  122. lambda _: True,
  123. )
  124. self._dp.register_chosen_inline_handler(
  125. self._chosen_inline_handler,
  126. lambda _: True,
  127. )
  128. self._dp.register_message_handler(
  129. self._message_handler,
  130. lambda *args: True,
  131. content_types=["any"],
  132. )
  133. old = self.bot.get_updates
  134. revoke = self._dp_revoke_token
  135. async def new(*args, **kwargs):
  136. nonlocal revoke, old
  137. try:
  138. return await old(*args, **kwargs)
  139. except TerminatedByOtherGetUpdates:
  140. await revoke()
  141. except Unauthorized:
  142. logger.critical("Got Unauthorized")
  143. await self._stop()
  144. self.bot.get_updates = new
  145. # Start polling as the separate task, just in case we will need
  146. # to force stop this coro. It should be cancelled only by `stop`
  147. # because it stops the bot from getting updates
  148. self._task = asyncio.ensure_future(self._dp.start_polling())
  149. self._cleaner_task = asyncio.ensure_future(self._cleaner())
  150. async def _stop(self):
  151. self._task.cancel()
  152. self._dp.stop_polling()
  153. self._cleaner_task.cancel()
  154. def pop_web_auth_token(self, token) -> bool:
  155. """Check if web confirmation button was pressed"""
  156. if token not in self._web_auth_tokens:
  157. return False
  158. self._web_auth_tokens.remove(token)
  159. return True
  160. if __name__ == "__main__":
  161. raise Exception("This file must be called as a module")