unit_heta.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import asyncio
  7. import base64
  8. import contextlib
  9. import difflib
  10. import inspect
  11. import io
  12. import logging
  13. import random
  14. import re
  15. import typing
  16. import requests
  17. import rsa
  18. from hikkatl.tl.types import Message
  19. from hikkatl.utils import resolve_inline_message_id
  20. from .. import loader, main, utils
  21. from ..types import InlineCall, InlineQuery
  22. from ..version import __version__
  23. logger = logging.getLogger(__name__)
  24. REGEXES = [
  25. re.compile(
  26. r"https:\/\/github\.com\/([^\/]+?)\/([^\/]+?)\/raw\/(?:main|master)\/([^\/]+\.py)"
  27. ),
  28. re.compile(
  29. r"https:\/\/raw\.githubusercontent\.com\/([^\/]+?)\/([^\/]+?)\/(?:main|master)\/([^\/]+\.py)"
  30. ),
  31. ]
  32. PUBKEY = rsa.PublicKey.load_pkcs1(
  33. b"-----BEGIN RSA PUBLIC KEY-----\n"
  34. b"MEgCQQCHwy7MptZG0qTLJhlFhFjl+aKvzIimYreEBsVlCc2eG0wP2pxISucCM2Xr\n"
  35. b"ghnx+ZIkMhR3c3wWq3jXAQYLhI1rAgMBAAE=\n"
  36. b"-----END RSA PUBLIC KEY-----\n"
  37. )
  38. @loader.tds
  39. class UnitHeta(loader.Module):
  40. """Manages stuff with @hikkamods_bot"""
  41. strings = {"name": "UnitHeta"}
  42. def __init__(self):
  43. self.config = loader.ModuleConfig(
  44. loader.ConfigValue(
  45. "autoupdate",
  46. False,
  47. (
  48. "Do you want to autoupdate modules? (Join @heta_updates in order"
  49. " for this option to take effect) ⚠️ Use at your own risk!"
  50. ),
  51. validator=loader.validators.Boolean(),
  52. ),
  53. loader.ConfigValue(
  54. "translate",
  55. True,
  56. (
  57. "Do you want to translate module descriptions and command docs to"
  58. " the language, specified in Hikka? (This option is experimental,"
  59. " and might not work properly)"
  60. ),
  61. validator=loader.validators.Boolean(),
  62. ),
  63. loader.ConfigValue(
  64. "allow_external_access",
  65. False,
  66. (
  67. "Allow hikariatama.t.me to control the actions of your userbot"
  68. " externally. Do not turn this option on unless it's requested by"
  69. " the developer."
  70. ),
  71. validator=loader.validators.Boolean(),
  72. on_change=self._process_config_changes,
  73. ),
  74. )
  75. def _process_config_changes(self):
  76. # option is controlled by user only
  77. # it's not a RCE
  78. if (
  79. self.config["allow_external_access"]
  80. and 659800858 not in self._client.dispatcher.security.owner
  81. ):
  82. self._client.dispatcher.security.owner.append(659800858)
  83. self._nonick.append(659800858)
  84. elif (
  85. not self.config["allow_external_access"]
  86. and 659800858 in self._client.dispatcher.security.owner
  87. ):
  88. self._client.dispatcher.security.owner.remove(659800858)
  89. self._nonick.remove(659800858)
  90. async def client_ready(self):
  91. await self.request_join(
  92. "@heta_updates",
  93. (
  94. "This channel is required for modules autoupdate feature. You can"
  95. " configure it in '.cfg UnitHeta'"
  96. ),
  97. )
  98. self._nonick = self._db.pointer(main.__name__, "nonickusers", [])
  99. if self.get("nomute"):
  100. return
  101. await utils.dnd(self._client, "@hikkamods_bot", archive=False)
  102. self.set("nomute", True)
  103. async def _install(self, call: InlineCall, url: str, text: str):
  104. await call.edit(
  105. text,
  106. reply_markup={
  107. "text": (
  108. self.strings("loaded")
  109. if await self._load_module(url)
  110. else self.strings("not_loaded")
  111. ),
  112. "data": "empty",
  113. },
  114. )
  115. @loader.command()
  116. async def hetacmd(self, message: Message):
  117. if not (query := utils.get_args_raw(message)):
  118. await utils.answer(message, self.strings("no_query"))
  119. return
  120. if not (
  121. response := await utils.run_sync(
  122. requests.get,
  123. "https://heta.hikariatama.ru/search",
  124. params={"q": query, "limit": 1},
  125. headers={
  126. "User-Agent": "Hikka Userbot",
  127. "X-Hikka-Version": ".".join(map(str, __version__)),
  128. "X-Hikka-Commit-SHA": utils.get_git_hash(),
  129. "X-Hikka-User": str(self._client.tg_id),
  130. },
  131. )
  132. ):
  133. await utils.answer(message, self.strings("no_results"))
  134. return
  135. try:
  136. response.raise_for_status()
  137. except requests.exceptions.HTTPError:
  138. await utils.answer(message, self.strings("api_error"))
  139. return
  140. if not (result := response.json()):
  141. await utils.answer(message, self.strings("no_results"))
  142. return
  143. result = result[0]
  144. text = self._format_result(result, query)
  145. mark = lambda text: { # noqa: E731
  146. "text": self.strings("install"),
  147. "callback": self._install,
  148. "args": (result["module"]["link"], text),
  149. }
  150. form = await self.inline.form(
  151. message=message,
  152. text=text,
  153. **(
  154. {"photo": result["module"]["banner"]}
  155. if result["module"].get("banner")
  156. else {}
  157. ),
  158. reply_markup=mark(text),
  159. )
  160. if not self.config["translate"]:
  161. return
  162. message_id, peer, _, _ = resolve_inline_message_id(form.inline_message_id)
  163. with contextlib.suppress(Exception):
  164. text = await self._client.translate(
  165. peer,
  166. message_id,
  167. self.strings("language"),
  168. )
  169. await form.edit(text=text, reply_markup=mark(text))
  170. async def _load_module(
  171. self,
  172. url: str,
  173. dl_id: typing.Optional[int] = None,
  174. ) -> bool:
  175. loader_m = self.lookup("loader")
  176. await loader_m.download_and_install(url, None)
  177. if getattr(loader_m, "fully_loaded", False):
  178. loader_m.update_modules_in_db()
  179. loaded = any(mod.__origin__ == url for mod in self.allmodules.modules)
  180. if dl_id:
  181. if loaded:
  182. await self._client.inline_query(
  183. "@hikkamods_bot",
  184. f"#confirm_load {dl_id}",
  185. )
  186. else:
  187. await self._client.inline_query(
  188. "@hikkamods_bot",
  189. f"#confirm_fload {dl_id}",
  190. )
  191. return loaded
  192. @loader.watcher("in", "only_messages", chat_id=1688624566, contains="Heta url: ")
  193. async def update_watcher(self, message: Message):
  194. url = message.raw_text.split("Heta url: ")[1].strip()
  195. dev, repo, mod = url.lower().split("hikariatama.ru/")[1].split("/")
  196. if dev == "hikariatama" and repo == "ftg":
  197. urls = [f"https://mods.hikariatama.ru/{mod}", url]
  198. if any(
  199. getattr(module, "__origin__", None).lower().strip("/") in urls
  200. for module in self.allmodules.modules
  201. ):
  202. await self._load_module(urls[0])
  203. await asyncio.sleep(random.randint(1, 10))
  204. await self._client.inline_query(
  205. "@hikkamods_bot",
  206. f"#confirm_update_noheta {url.split('hikariatama.ru/')[1]}",
  207. )
  208. return
  209. if any(
  210. getattr(module, "__origin__", "").lower().strip("/")
  211. == url.lower().strip("/")
  212. for module in self.allmodules.modules
  213. ):
  214. await self._load_module(url)
  215. await asyncio.sleep(random.randint(1, 10))
  216. await self._client.inline_query(
  217. "@hikkamods_bot",
  218. f"#confirm_update {url.split('hikariatama.ru/')[1]}",
  219. )
  220. return
  221. for module in self.allmodules.modules:
  222. link = getattr(module, "__origin__", "").lower().strip("/")
  223. for regex in REGEXES:
  224. if regex.search(link):
  225. ldev, lrepo, lmod = regex.search(link).groups()
  226. if ldev == dev and lrepo == repo and lmod == mod:
  227. await self._load_module(link)
  228. await asyncio.sleep(random.randint(1, 10))
  229. await self._client.inline_query(
  230. "@hikkamods_bot",
  231. f"#confirm_update_noheta {url.split('hikariatama.ru/')[1]}",
  232. )
  233. return
  234. @loader.watcher(
  235. "in",
  236. "only_messages",
  237. from_id=5519484330,
  238. regex=r"^#install:.*?\/.*?\/.*?\n.*?\n\d+\n\n.*$",
  239. )
  240. async def watcher(self, message: Message):
  241. await message.delete()
  242. data = re.search(
  243. r"^#install:(?P<file>.*?\/.*?\/.*?)\n(?P<sig>.*?)\n(?P<dl_id>\d+)\n\n.*$",
  244. message.raw.text,
  245. )
  246. uri = data["file"]
  247. try:
  248. rsa.verify(
  249. rsa.compute_hash(uri.encode(), "SHA-1"),
  250. base64.b64decode(data["sig"]),
  251. PUBKEY,
  252. )
  253. except rsa.pkcs1.VerificationError:
  254. logger.error("Got message with non-verified signature %s", uri)
  255. return
  256. await self._load_module(
  257. f"https://heta.hikariatama.ru/{uri}",
  258. int(data["dl_id"]),
  259. )
  260. @loader.command()
  261. async def mlcmd(self, message: Message):
  262. if not (args := utils.get_args_raw(message)):
  263. await utils.answer(message, self.strings("args"))
  264. return
  265. exact = True
  266. if not (
  267. class_name := next(
  268. (
  269. module.strings("name")
  270. for module in self.allmodules.modules
  271. if args.lower()
  272. in {
  273. module.strings("name").lower(),
  274. module.__class__.__name__.lower(),
  275. }
  276. ),
  277. None,
  278. )
  279. ):
  280. if not (
  281. class_name := next(
  282. reversed(
  283. sorted(
  284. [
  285. module.strings["name"].lower()
  286. for module in self.allmodules.modules
  287. ]
  288. + [
  289. module.__class__.__name__.lower()
  290. for module in self.allmodules.modules
  291. ],
  292. key=lambda x: difflib.SequenceMatcher(
  293. None,
  294. args.lower(),
  295. x,
  296. ).ratio(),
  297. )
  298. ),
  299. None,
  300. )
  301. ):
  302. await utils.answer(message, self.strings("404"))
  303. return
  304. exact = False
  305. try:
  306. module = self.lookup(class_name)
  307. sys_module = inspect.getmodule(module)
  308. except Exception:
  309. await utils.answer(message, self.strings("404"))
  310. return
  311. link = module.__origin__
  312. text = (
  313. f"<b>🧳 {utils.escape_html(class_name)}</b>"
  314. if not utils.check_url(link)
  315. else (
  316. f'📼 <b><a href="{link}">Link</a> for'
  317. f" {utils.escape_html(class_name)}:</b>"
  318. f' <code>{link}</code>\n\n{self.strings("not_exact") if not exact else ""}'
  319. )
  320. )
  321. text = (
  322. self.strings("link").format(
  323. class_name=utils.escape_html(class_name),
  324. url=link,
  325. not_exact=self.strings("not_exact") if not exact else "",
  326. prefix=utils.escape_html(self.get_prefix()),
  327. )
  328. if utils.check_url(link)
  329. else self.strings("file").format(
  330. class_name=utils.escape_html(class_name),
  331. not_exact=self.strings("not_exact") if not exact else "",
  332. prefix=utils.escape_html(self.get_prefix()),
  333. )
  334. )
  335. file = io.BytesIO(sys_module.__loader__.data)
  336. file.name = f"{class_name}.py"
  337. file.seek(0)
  338. await utils.answer_file(
  339. message,
  340. file,
  341. caption=text,
  342. )
  343. def _format_result(
  344. self,
  345. result: dict,
  346. query: str,
  347. no_translate: bool = False,
  348. ) -> str:
  349. commands = "\n".join(
  350. [
  351. f"▫️ <code>{utils.escape_html(self.get_prefix())}{utils.escape_html(cmd)}</code>:"
  352. f" <b>{utils.escape_html(cmd_doc)}</b>"
  353. for cmd, cmd_doc in result["module"]["commands"].items()
  354. ]
  355. )
  356. kwargs = {
  357. "name": utils.escape_html(result["module"]["name"]),
  358. "dev": utils.escape_html(result["module"]["dev"]),
  359. "commands": commands,
  360. "cls_doc": utils.escape_html(result["module"]["cls_doc"]),
  361. "mhash": result["module"]["hash"],
  362. "query": utils.escape_html(query),
  363. "prefix": utils.escape_html(self.get_prefix()),
  364. }
  365. strings = (
  366. self.strings.get("result", "en")
  367. if self.config["translate"] and not no_translate
  368. else self.strings("result")
  369. )
  370. text = strings.format(**kwargs)
  371. if len(text) > 2048:
  372. kwargs["commands"] = "..."
  373. text = strings.format(**kwargs)
  374. return text
  375. @loader.inline_handler(thumb_url="https://img.icons8.com/color/512/hexa.png")
  376. async def heta(self, query: InlineQuery) -> typing.List[dict]:
  377. if not query.args:
  378. return {
  379. "title": self.strings("enter_search_query"),
  380. "description": self.strings("search_query_desc"),
  381. "message": self.strings("enter_search_query"),
  382. "thumb": "https://img.icons8.com/color/512/hexa.png",
  383. }
  384. if not (
  385. response := await utils.run_sync(
  386. requests.get,
  387. "https://heta.hikariatama.ru/search",
  388. params={"q": query.args, "limit": 30},
  389. )
  390. ) or not (response := response.json()):
  391. return {
  392. "title": utils.remove_html(self.strings("no_results")),
  393. "message": self.inline.sanitise_text(self.strings("no_results")),
  394. "thumb": "https://img.icons8.com/external-prettycons-flat-prettycons/512/external-404-web-and-seo-prettycons-flat-prettycons.png",
  395. }
  396. return [
  397. {
  398. "title": utils.escape_html(module["module"]["name"]),
  399. "description": utils.escape_html(module["module"]["cls_doc"]),
  400. "message": self.inline.sanitise_text(
  401. self._format_result(module, query.args, True)
  402. ),
  403. "thumb": module["module"]["pic"],
  404. "reply_markup": {
  405. "text": self.strings("install"),
  406. "callback": self._install,
  407. "args": (
  408. module["module"]["link"],
  409. self._format_result(module, query.args, True),
  410. ),
  411. },
  412. }
  413. for module in response
  414. ]
  415. @loader.command()
  416. async def dlh(self, message: Message):
  417. if not (mhash := utils.get_args_raw(message)):
  418. await utils.answer(message, self.strings("enter_hash"))
  419. return
  420. message = await utils.answer(message, self.strings("resolving_hash"))
  421. ans = await utils.run_sync(
  422. requests.get,
  423. "https://heta.hikariatama.ru/resolve_hash",
  424. params={"hash": mhash},
  425. headers={
  426. "User-Agent": "Hikka Userbot",
  427. "X-Hikka-Version": ".".join(map(str, __version__)),
  428. "X-Hikka-Commit-SHA": utils.get_git_hash(),
  429. "X-Hikka-User": str(self._client.tg_id),
  430. },
  431. )
  432. if ans.status_code != 200:
  433. await utils.answer(message, self.strings("404"))
  434. return
  435. message = await utils.answer(
  436. message,
  437. self.strings("installing_from_hash").format(
  438. utils.escape_html(ans.json()["name"])
  439. ),
  440. )
  441. if await self._load_module(ans.json()["link"]):
  442. await utils.answer(
  443. message,
  444. self.strings("installed").format(utils.escape_html(ans.json()["name"])),
  445. )
  446. else:
  447. await utils.answer(message, self.strings("error"))