events.py 19 KB


  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import inspect
  9. import logging
  10. import re
  11. import typing
  12. from asyncio import Event
  13. from aiogram.types import CallbackQuery, ChosenInlineResult
  14. from aiogram.types import InlineQuery as AiogramInlineQuery
  15. from aiogram.types import (
  16. InlineQueryResultArticle,
  17. InlineQueryResultDocument,
  18. InlineQueryResultGif,
  19. InlineQueryResultPhoto,
  20. InlineQueryResultVideo,
  21. InputTextMessageContent,
  22. )
  23. from aiogram.types import Message as AiogramMessage
  24. from .. import utils
  25. from .types import InlineCall, InlineQuery, InlineUnit, BotInlineCall
  26. logger = logging.getLogger(__name__)
  27. class Events(InlineUnit):
  28. async def _message_handler(self, message: AiogramMessage):
  29. """Processes incoming messages"""
  30. if message.chat.type != "private" or message.text == "/start hikka init":
  31. return
  32. for mod in self._allmodules.modules:
  33. if (
  34. not hasattr(mod, "aiogram_watcher")
  35. or message.text == "/start"
  36. and mod.__class__.__name__ != "InlineStuffMod"
  37. ):
  38. continue
  39. try:
  40. await mod.aiogram_watcher(message)
  41. except BaseException:
  42. logger.exception("Error on running aiogram watcher!")
  43. async def _inline_handler(self, inline_query: AiogramInlineQuery):
  44. """Inline query handler (forms' calls)"""
  45. # Retrieve query from passed object
  46. query = inline_query.query
  47. # If we didn't get any query, return help
  48. if not query:
  49. await self._query_help(inline_query)
  50. return
  51. # First, dispatch all registered inline handlers
  52. cmd = inline_query.query.split()[0].lower()
  53. if (
  54. cmd in self._allmodules.inline_handlers
  55. and await self.check_inline_security(
  56. func=self._allmodules.inline_handlers[cmd],
  57. user=inline_query.from_user.id,
  58. )
  59. ):
  60. instance = InlineQuery(inline_query)
  61. try:
  62. result = await self._allmodules.inline_handlers[cmd](instance)
  63. except BaseException:
  64. logger.exception("Error on running inline watcher!")
  65. return
  66. if not result:
  67. return
  68. if isinstance(result, dict):
  69. result = [result]
  70. if not isinstance(result, list):
  71. logger.error(
  72. "Got invalid type from inline handler. It must be `dict`, got `%s`",
  73. type(result),
  74. )
  75. await instance.e500()
  76. return
  77. for res in result:
  78. mandatory = {"message", "photo", "gif", "video", "file"}
  79. if all(item not in res for item in mandatory):
  80. logger.error(
  81. "Got invalid type from inline handler. It must contain one of"
  82. " `%s`",
  83. mandatory,
  84. )
  85. await instance.e500()
  86. return
  87. if "file" in res and "mime_type" not in res:
  88. logger.error(
  89. "Got invalid type from inline handler. It contains field"
  90. " `file`, so it must contain `mime_type` as well"
  91. )
  92. inline_result = []
  93. for res in result:
  94. if "message" in res:
  95. inline_result += [
  96. InlineQueryResultArticle(
  97. id=utils.rand(20),
  98. title=self.sanitise_text(res["title"]),
  99. description=self.sanitise_text(res.get("description")),
  100. input_message_content=InputTextMessageContent(
  101. self.sanitise_text(res["message"]),
  102. "HTML",
  103. disable_web_page_preview=True,
  104. ),
  105. thumb_url=res.get("thumb"),
  106. thumb_width=128,
  107. thumb_height=128,
  108. reply_markup=self.generate_markup(res.get("reply_markup")),
  109. )
  110. ]
  111. elif "photo" in res:
  112. inline_result += [
  113. InlineQueryResultPhoto(
  114. id=utils.rand(20),
  115. title=self.sanitise_text(res.get("title")),
  116. description=self.sanitise_text(res.get("description")),
  117. caption=self.sanitise_text(res.get("caption")),
  118. parse_mode="HTML",
  119. thumb_url=res.get("thumb", res["photo"]),
  120. photo_url=res["photo"],
  121. reply_markup=self.generate_markup(res.get("reply_markup")),
  122. )
  123. ]
  124. elif "gif" in res:
  125. inline_result += [
  126. InlineQueryResultGif(
  127. id=utils.rand(20),
  128. title=self.sanitise_text(res.get("title")),
  129. caption=self.sanitise_text(res.get("caption")),
  130. parse_mode="HTML",
  131. thumb_url=res.get("thumb", res["gif"]),
  132. gif_url=res["gif"],
  133. reply_markup=self.generate_markup(res.get("reply_markup")),
  134. )
  135. ]
  136. elif "video" in res:
  137. inline_result += [
  138. InlineQueryResultVideo(
  139. id=utils.rand(20),
  140. title=self.sanitise_text(res.get("title")),
  141. description=self.sanitise_text(res.get("description")),
  142. caption=self.sanitise_text(res.get("caption")),
  143. parse_mode="HTML",
  144. thumb_url=res.get("thumb", res["video"]),
  145. video_url=res["video"],
  146. mime_type="video/mp4",
  147. reply_markup=self.generate_markup(res.get("reply_markup")),
  148. )
  149. ]
  150. elif "file" in res:
  151. inline_result += [
  152. InlineQueryResultDocument(
  153. id=utils.rand(20),
  154. title=self.sanitise_text(res.get("title")),
  155. description=self.sanitise_text(res.get("description")),
  156. caption=self.sanitise_text(res.get("caption")),
  157. parse_mode="HTML",
  158. thumb_url=res.get("thumb", res["file"]),
  159. document_url=res["file"],
  160. mime_type=res["mime_type"],
  161. reply_markup=self.generate_markup(res.get("reply_markup")),
  162. )
  163. ]
  164. try:
  165. await inline_query.answer(inline_result, cache_time=0)
  166. except Exception:
  167. logger.exception(
  168. "Exception when answering inline query with result from %s",
  169. cmd,
  170. )
  171. return
  172. await self._form_inline_handler(inline_query)
  173. await self._gallery_inline_handler(inline_query)
  174. await self._list_inline_handler(inline_query)
  175. async def _callback_query_handler(
  176. self,
  177. call: CallbackQuery,
  178. reply_markup: typing.Optional[typing.List[typing.List[dict]]] = None,
  179. ):
  180. """Callback query handler (buttons' presses)"""
  181. if reply_markup is None:
  182. reply_markup = []
  183. if re.search(r"authorize_web_(.{8})", call.data):
  184. self._web_auth_tokens += [re.search(r"authorize_web_(.{8})", call.data)[1]]
  185. return
  186. # First, dispatch all registered callback handlers
  187. for func in self._allmodules.callback_handlers.values():
  188. if await self.check_inline_security(func=func, user=call.from_user.id):
  189. try:
  190. await func(
  191. (
  192. BotInlineCall
  193. if getattr(getattr(call, "message", None), "chat", None)
  194. else InlineCall
  195. )(call, self, None)
  196. )
  197. except Exception:
  198. logger.exception("Error on running callback watcher!")
  199. await call.answer(
  200. "Error occured while processing request. More info in logs",
  201. show_alert=True,
  202. )
  203. continue
  204. for unit_id, unit in self._units.copy().items():
  205. for button in utils.array_sum(unit.get("buttons", [])):
  206. if not isinstance(button, dict):
  207. logger.warning(
  208. "Can't process update, because of corrupted button: %s",
  209. button,
  210. )
  211. continue
  212. if button.get("_callback_data") == call.data:
  213. if (
  214. button.get("disable_security", False)
  215. or unit.get("disable_security", False)
  216. or (
  217. unit.get("force_me", False)
  218. and call.from_user.id == self._me
  219. )
  220. or not unit.get("force_me", False)
  221. and (
  222. await self.check_inline_security(
  223. func=unit.get(
  224. "perms_map",
  225. lambda: self._client.dispatcher.security._default,
  226. )(), # we call it so we can get reloaded rights in runtime
  227. user=call.from_user.id,
  228. )
  229. if "message" in unit
  230. else False
  231. )
  232. ):
  233. pass
  234. elif call.from_user.id not in (
  235. self._client.dispatcher.security._owner
  236. + unit.get("always_allow", [])
  237. + button.get("always_allow", [])
  238. ):
  239. await call.answer("You are not allowed to press this button!")
  240. return
  241. try:
  242. result = await button["callback"](
  243. (
  244. BotInlineCall
  245. if getattr(getattr(call, "message", None), "chat", None)
  246. else InlineCall
  247. )(call, self, unit_id),
  248. *button.get("args", []),
  249. **button.get("kwargs", {}),
  250. )
  251. except Exception:
  252. logger.exception("Error on running callback watcher!")
  253. await call.answer(
  254. "Error occurred while processing request. More info in"
  255. " logs",
  256. show_alert=True,
  257. )
  258. return
  259. return result
  260. if call.data in self._custom_map:
  261. if (
  262. self._custom_map[call.data].get("disable_security", False)
  263. or (
  264. self._custom_map[call.data].get("force_me", False)
  265. and call.from_user.id == self._me
  266. )
  267. or not self._custom_map[call.data].get("force_me", False)
  268. and (
  269. await self.check_inline_security(
  270. func=self._custom_map[call.data].get(
  271. "perms_map",
  272. lambda: self._client.dispatcher.security._default,
  273. )(),
  274. user=call.from_user.id,
  275. )
  276. if "message" in self._custom_map[call.data]
  277. else False
  278. )
  279. ):
  280. pass
  281. elif (
  282. call.from_user.id not in self._client.dispatcher.security._owner
  283. and call.from_user.id
  284. not in self._custom_map[call.data].get("always_allow", [])
  285. ):
  286. await call.answer("You are not allowed to press this button!")
  287. return
  288. await self._custom_map[call.data]["handler"](
  289. (
  290. BotInlineCall
  291. if getattr(getattr(call, "message", None), "chat", None)
  292. else InlineCall
  293. )(call, self, None),
  294. *self._custom_map[call.data].get("args", []),
  295. **self._custom_map[call.data].get("kwargs", {}),
  296. )
  297. return
  298. async def _chosen_inline_handler(
  299. self,
  300. chosen_inline_query: ChosenInlineResult,
  301. ):
  302. query = chosen_inline_query.query
  303. if not query:
  304. return
  305. for unit_id, unit in self._units.items():
  306. if (
  307. unit_id == query
  308. and "future" in unit
  309. and isinstance(unit["future"], Event)
  310. ):
  311. unit["inline_message_id"] = chosen_inline_query.inline_message_id
  312. unit["future"].set()
  313. return
  314. for unit_id, unit in self._units.copy().items():
  315. for button in utils.array_sum(unit.get("buttons", [])):
  316. if (
  317. "_switch_query" in button
  318. and "input" in button
  319. and button["_switch_query"] == query.split()[0]
  320. and chosen_inline_query.from_user.id
  321. in [self._me]
  322. + self._client.dispatcher.security._owner
  323. + unit.get("always_allow", [])
  324. ):
  325. query = query.split(maxsplit=1)[1] if len(query.split()) > 1 else ""
  326. try:
  327. return await button["handler"](
  328. InlineCall(chosen_inline_query, self, unit_id),
  329. query,
  330. *button.get("args", []),
  331. **button.get("kwargs", {}),
  332. )
  333. except Exception:
  334. logger.exception(
  335. "Exception while running chosen query watcher!"
  336. )
  337. return
  338. async def _query_help(self, inline_query: InlineQuery):
  339. _help = []
  340. for name, fun in self._allmodules.inline_handlers.items():
  341. # If user doesn't have enough permissions
  342. # to run this inline command, do not show it
  343. # in help
  344. if not await self.check_inline_security(
  345. func=fun,
  346. user=inline_query.from_user.id,
  347. ):
  348. continue
  349. # Retrieve docs from func
  350. try:
  351. doc = inspect.getdoc(fun)
  352. except Exception:
  353. doc = "🦥 No docs"
  354. try:
  355. thumb = getattr(fun, "thumb_url", None) or fun.__self__.hikka_meta_pic
  356. except Exception:
  357. thumb = None
  358. thumb = thumb or "https://img.icons8.com/fluency/50/000000/info-squared.png"
  359. _help += [
  360. (
  361. InlineQueryResultArticle(
  362. id=utils.rand(20),
  363. title=f"🌘 Command «{name}»",
  364. description=doc,
  365. input_message_content=InputTextMessageContent(
  366. "<b>🌘 Command"
  367. f" «{utils.escape_html(name)}»</b>\n\n<i>{utils.escape_html(doc)}</i>",
  368. "HTML",
  369. disable_web_page_preview=True,
  370. ),
  371. thumb_url=thumb,
  372. thumb_width=128,
  373. thumb_height=128,
  374. reply_markup=self.generate_markup(
  375. {
  376. "text": "🏌️ Run command",
  377. "switch_inline_query_current_chat": f"{name} ",
  378. }
  379. ),
  380. ),
  381. (
  382. f"🎹 <code>@{self.bot_username} {utils.escape_html(name)}</code>"
  383. f" - {utils.escape_html(doc)}\n"
  384. ),
  385. )
  386. ]
  387. if not _help:
  388. await inline_query.answer(
  389. [
  390. InlineQueryResultArticle(
  391. id=utils.rand(20),
  392. title="Show available inline commands",
  393. description="You have no available commands",
  394. input_message_content=InputTextMessageContent(
  395. "<b>😔 There are no available inline commands or you lack"
  396. " access to them</b>",
  397. "HTML",
  398. disable_web_page_preview=True,
  399. ),
  400. thumb_url=(
  401. "https://img.icons8.com/fluency/50/000000/info-squared.png"
  402. ),
  403. thumb_width=128,
  404. thumb_height=128,
  405. )
  406. ],
  407. cache_time=0,
  408. )
  409. return
  410. await inline_query.answer(
  411. [
  412. InlineQueryResultArticle(
  413. id=utils.rand(20),
  414. title="📄 Show all available inline commands",
  415. description=f"ℹ️ You have {len(_help)} available command(-s)",
  416. input_message_content=InputTextMessageContent(
  417. "<b>ℹ️ Available inline commands:</b>\n\n{}".format(
  418. "\n".join(i[1] for i in _help)
  419. ),
  420. "HTML",
  421. disable_web_page_preview=True,
  422. ),
  423. thumb_url=(
  424. "https://img.icons8.com/fluency/50/000000/info-squared.png"
  425. ),
  426. thumb_width=128,
  427. thumb_height=128,
  428. )
  429. ]
  430. + [i[0] for i in _help],
  431. cache_time=0,
  432. )