123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- # ©️ Dan Gazizullin, 2021-2023
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import base64
- import contextlib
- import difflib
- import inspect
- import io
- import logging
- import random
- import re
- import typing
- import requests
- import rsa
- from hikkatl.tl.types import Message
- from hikkatl.utils import resolve_inline_message_id
- from .. import loader, main, utils
- from ..types import InlineCall, InlineQuery
- from ..version import __version__
- logger = logging.getLogger(__name__)
- REGEXES = [
- re.compile(
- r"https:\/\/github\.com\/([^\/]+?)\/([^\/]+?)\/raw\/(?:main|master)\/([^\/]+\.py)"
- ),
- re.compile(
- r"https:\/\/raw\.githubusercontent\.com\/([^\/]+?)\/([^\/]+?)\/(?:main|master)\/([^\/]+\.py)"
- ),
- ]
- PUBKEY = rsa.PublicKey.load_pkcs1(
- b"-----BEGIN RSA PUBLIC KEY-----\n"
- b"MEgCQQCHwy7MptZG0qTLJhlFhFjl+aKvzIimYreEBsVlCc2eG0wP2pxISucCM2Xr\n"
- b"ghnx+ZIkMhR3c3wWq3jXAQYLhI1rAgMBAAE=\n"
- b"-----END RSA PUBLIC KEY-----\n"
- )
- @loader.tds
- class UnitHeta(loader.Module):
- """Manages stuff with @hikkamods_bot"""
- strings = {"name": "UnitHeta"}
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "autoupdate",
- False,
- (
- "Do you want to autoupdate modules? (Join @heta_updates in order"
- " for this option to take effect) ⚠️ Use at your own risk!"
- ),
- validator=loader.validators.Boolean(),
- ),
- loader.ConfigValue(
- "translate",
- True,
- (
- "Do you want to translate module descriptions and command docs to"
- " the language, specified in Hikka? (This option is experimental,"
- " and might not work properly)"
- ),
- validator=loader.validators.Boolean(),
- ),
- loader.ConfigValue(
- "allow_external_access",
- False,
- (
- "Allow hikariatama.t.me to control the actions of your userbot"
- " externally. Do not turn this option on unless it's requested by"
- " the developer."
- ),
- validator=loader.validators.Boolean(),
- on_change=self._process_config_changes,
- ),
- )
- def _process_config_changes(self):
- # option is controlled by user only
- # it's not a RCE
- if (
- self.config["allow_external_access"]
- and 659800858 not in self._client.dispatcher.security.owner
- ):
- self._client.dispatcher.security.owner.append(659800858)
- self._nonick.append(659800858)
- elif (
- not self.config["allow_external_access"]
- and 659800858 in self._client.dispatcher.security.owner
- ):
- self._client.dispatcher.security.owner.remove(659800858)
- self._nonick.remove(659800858)
- async def client_ready(self):
- await self.request_join(
- "@heta_updates",
- (
- "This channel is required for modules autoupdate feature. You can"
- " configure it in '.cfg UnitHeta'"
- ),
- )
- self._nonick = self._db.pointer(main.__name__, "nonickusers", [])
- if self.get("nomute"):
- return
- await utils.dnd(self._client, "@hikkamods_bot", archive=False)
- self.set("nomute", True)
- async def _install(self, call: InlineCall, url: str, text: str):
- await call.edit(
- text,
- reply_markup={
- "text": (
- self.strings("loaded")
- if await self._load_module(url)
- else self.strings("not_loaded")
- ),
- "data": "empty",
- },
- )
- @loader.command()
- async def hetacmd(self, message: Message):
- if not (query := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("no_query"))
- return
- if not (
- response := await utils.run_sync(
- requests.get,
- "https://heta.hikariatama.ru/search",
- params={"q": query, "limit": 1},
- headers={
- "User-Agent": "Hikka Userbot",
- "X-Hikka-Version": ".".join(map(str, __version__)),
- "X-Hikka-Commit-SHA": utils.get_git_hash(),
- "X-Hikka-User": str(self._client.tg_id),
- },
- )
- ):
- await utils.answer(message, self.strings("no_results"))
- return
- try:
- response.raise_for_status()
- except requests.exceptions.HTTPError:
- await utils.answer(message, self.strings("api_error"))
- return
- if not (result := response.json()):
- await utils.answer(message, self.strings("no_results"))
- return
- result = result[0]
- text = self._format_result(result, query)
- mark = lambda text: { # noqa: E731
- "text": self.strings("install"),
- "callback": self._install,
- "args": (result["module"]["link"], text),
- }
- form = await self.inline.form(
- message=message,
- text=text,
- **(
- {"photo": result["module"]["banner"]}
- if result["module"].get("banner")
- else {}
- ),
- reply_markup=mark(text),
- )
- if not self.config["translate"]:
- return
- message_id, peer, _, _ = resolve_inline_message_id(form.inline_message_id)
- with contextlib.suppress(Exception):
- text = await self._client.translate(
- peer,
- message_id,
- self.strings("language"),
- )
- await form.edit(text=text, reply_markup=mark(text))
- async def _load_module(
- self,
- url: str,
- dl_id: typing.Optional[int] = None,
- ) -> bool:
- loader_m = self.lookup("loader")
- await loader_m.download_and_install(url, None)
- if getattr(loader_m, "fully_loaded", False):
- loader_m.update_modules_in_db()
- loaded = any(mod.__origin__ == url for mod in self.allmodules.modules)
- if dl_id:
- if loaded:
- await self._client.inline_query(
- "@hikkamods_bot",
- f"#confirm_load {dl_id}",
- )
- else:
- await self._client.inline_query(
- "@hikkamods_bot",
- f"#confirm_fload {dl_id}",
- )
- return loaded
- @loader.watcher("in", "only_messages", chat_id=1688624566, contains="Heta url: ")
- async def update_watcher(self, message: Message):
- url = message.raw_text.split("Heta url: ")[1].strip()
- dev, repo, mod = url.lower().split("hikariatama.ru/")[1].split("/")
- if dev == "hikariatama" and repo == "ftg":
- urls = [f"https://mods.hikariatama.ru/{mod}", url]
- if any(
- getattr(module, "__origin__", None).lower().strip("/") in urls
- for module in self.allmodules.modules
- ):
- await self._load_module(urls[0])
- await asyncio.sleep(random.randint(1, 10))
- await self._client.inline_query(
- "@hikkamods_bot",
- f"#confirm_update_noheta {url.split('hikariatama.ru/')[1]}",
- )
- return
- if any(
- getattr(module, "__origin__", "").lower().strip("/")
- == url.lower().strip("/")
- for module in self.allmodules.modules
- ):
- await self._load_module(url)
- await asyncio.sleep(random.randint(1, 10))
- await self._client.inline_query(
- "@hikkamods_bot",
- f"#confirm_update {url.split('hikariatama.ru/')[1]}",
- )
- return
- for module in self.allmodules.modules:
- link = getattr(module, "__origin__", "").lower().strip("/")
- for regex in REGEXES:
- if regex.search(link):
- ldev, lrepo, lmod = regex.search(link).groups()
- if ldev == dev and lrepo == repo and lmod == mod:
- await self._load_module(link)
- await asyncio.sleep(random.randint(1, 10))
- await self._client.inline_query(
- "@hikkamods_bot",
- f"#confirm_update_noheta {url.split('hikariatama.ru/')[1]}",
- )
- return
- @loader.watcher(
- "in",
- "only_messages",
- from_id=5519484330,
- regex=r"^#install:.*?\/.*?\/.*?\n.*?\n\d+\n\n.*$",
- )
- async def watcher(self, message: Message):
- await message.delete()
- data = re.search(
- r"^#install:(?P<file>.*?\/.*?\/.*?)\n(?P<sig>.*?)\n(?P<dl_id>\d+)\n\n.*$",
- message.raw.text,
- )
- uri = data["file"]
- try:
- rsa.verify(
- rsa.compute_hash(uri.encode(), "SHA-1"),
- base64.b64decode(data["sig"]),
- PUBKEY,
- )
- except rsa.pkcs1.VerificationError:
- logger.error("Got message with non-verified signature %s", uri)
- return
- await self._load_module(
- f"https://heta.hikariatama.ru/{uri}",
- int(data["dl_id"]),
- )
- @loader.command()
- async def mlcmd(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("args"))
- return
- exact = True
- if not (
- class_name := next(
- (
- module.strings("name")
- for module in self.allmodules.modules
- if args.lower()
- in {
- module.strings("name").lower(),
- module.__class__.__name__.lower(),
- }
- ),
- None,
- )
- ):
- if not (
- class_name := next(
- reversed(
- sorted(
- [
- module.strings["name"].lower()
- for module in self.allmodules.modules
- ]
- + [
- module.__class__.__name__.lower()
- for module in self.allmodules.modules
- ],
- key=lambda x: difflib.SequenceMatcher(
- None,
- args.lower(),
- x,
- ).ratio(),
- )
- ),
- None,
- )
- ):
- await utils.answer(message, self.strings("404"))
- return
- exact = False
- try:
- module = self.lookup(class_name)
- sys_module = inspect.getmodule(module)
- except Exception:
- await utils.answer(message, self.strings("404"))
- return
- link = module.__origin__
- text = (
- f"<b>🧳 {utils.escape_html(class_name)}</b>"
- if not utils.check_url(link)
- else (
- f'📼 <b><a href="{link}">Link</a> for'
- f" {utils.escape_html(class_name)}:</b>"
- f' <code>{link}</code>\n\n{self.strings("not_exact") if not exact else ""}'
- )
- )
- text = (
- self.strings("link").format(
- class_name=utils.escape_html(class_name),
- url=link,
- not_exact=self.strings("not_exact") if not exact else "",
- prefix=utils.escape_html(self.get_prefix()),
- )
- if utils.check_url(link)
- else self.strings("file").format(
- class_name=utils.escape_html(class_name),
- not_exact=self.strings("not_exact") if not exact else "",
- prefix=utils.escape_html(self.get_prefix()),
- )
- )
- file = io.BytesIO(sys_module.__loader__.data)
- file.name = f"{class_name}.py"
- file.seek(0)
- await utils.answer_file(
- message,
- file,
- caption=text,
- )
- def _format_result(
- self,
- result: dict,
- query: str,
- no_translate: bool = False,
- ) -> str:
- commands = "\n".join(
- [
- f"▫️ <code>{utils.escape_html(self.get_prefix())}{utils.escape_html(cmd)}</code>:"
- f" <b>{utils.escape_html(cmd_doc)}</b>"
- for cmd, cmd_doc in result["module"]["commands"].items()
- ]
- )
- kwargs = {
- "name": utils.escape_html(result["module"]["name"]),
- "dev": utils.escape_html(result["module"]["dev"]),
- "commands": commands,
- "cls_doc": utils.escape_html(result["module"]["cls_doc"]),
- "mhash": result["module"]["hash"],
- "query": utils.escape_html(query),
- "prefix": utils.escape_html(self.get_prefix()),
- }
- strings = (
- self.strings.get("result", "en")
- if self.config["translate"] and not no_translate
- else self.strings("result")
- )
- text = strings.format(**kwargs)
- if len(text) > 2048:
- kwargs["commands"] = "..."
- text = strings.format(**kwargs)
- return text
- @loader.inline_handler(thumb_url="https://img.icons8.com/color/512/hexa.png")
- async def heta(self, query: InlineQuery) -> typing.List[dict]:
- if not query.args:
- return {
- "title": self.strings("enter_search_query"),
- "description": self.strings("search_query_desc"),
- "message": self.strings("enter_search_query"),
- "thumb": "https://img.icons8.com/color/512/hexa.png",
- }
- if not (
- response := await utils.run_sync(
- requests.get,
- "https://heta.hikariatama.ru/search",
- params={"q": query.args, "limit": 30},
- )
- ) or not (response := response.json()):
- return {
- "title": utils.remove_html(self.strings("no_results")),
- "message": self.inline.sanitise_text(self.strings("no_results")),
- "thumb": "https://img.icons8.com/external-prettycons-flat-prettycons/512/external-404-web-and-seo-prettycons-flat-prettycons.png",
- }
- return [
- {
- "title": utils.escape_html(module["module"]["name"]),
- "description": utils.escape_html(module["module"]["cls_doc"]),
- "message": self.inline.sanitise_text(
- self._format_result(module, query.args, True)
- ),
- "thumb": module["module"]["pic"],
- "reply_markup": {
- "text": self.strings("install"),
- "callback": self._install,
- "args": (
- module["module"]["link"],
- self._format_result(module, query.args, True),
- ),
- },
- }
- for module in response
- ]
- @loader.command()
- async def dlh(self, message: Message):
- if not (mhash := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("enter_hash"))
- return
- message = await utils.answer(message, self.strings("resolving_hash"))
- ans = await utils.run_sync(
- requests.get,
- "https://heta.hikariatama.ru/resolve_hash",
- params={"hash": mhash},
- headers={
- "User-Agent": "Hikka Userbot",
- "X-Hikka-Version": ".".join(map(str, __version__)),
- "X-Hikka-Commit-SHA": utils.get_git_hash(),
- "X-Hikka-User": str(self._client.tg_id),
- },
- )
- if ans.status_code != 200:
- await utils.answer(message, self.strings("404"))
- return
- message = await utils.answer(
- message,
- self.strings("installing_from_hash").format(
- utils.escape_html(ans.json()["name"])
- ),
- )
- if await self._load_module(ans.json()["link"]):
- await utils.answer(
- message,
- self.strings("installed").format(utils.escape_html(ans.json()["name"])),
- )
- else:
- await utils.answer(message, self.strings("error"))
|