123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- # ©️ Dan Gazizullin, 2021-2023
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import builtins
- import importlib
- import inspect
- import io
- import logging
- import os
- import subprocess
- import sys
- import traceback
- import typing
- from io import BytesIO
- from sys import version_info
- import git
- try:
- from PIL import Image
- except Exception:
- PIP_AVAILABLE = False
- else:
- PIP_AVAILABLE = True
- from pyrogram import Client, errors, types
- from .. import version
- from .._internal import restart
- from ..database import Database
- from ..tl_cache import CustomTelegramClient
- from ..types import JSONSerializable
- DRAGON_EMOJI = "<emoji document_id=5375360100196163660>🐲</emoji>"
- native_import = builtins.__import__
- logger = logging.getLogger(__name__)
- class ImportLock:
- # This is used to ensure, that dynamic dragon import passes in
- # the right client. Whenever one of the clients attempts to install
- # dragon-specific module, it must aqcuire the `import_lock` or wait
- # until it's released. Then set the `current_client` variable to self.
- def __init__(self):
- self.lock = asyncio.Lock()
- self.current_client = None
- def __call__(self, client: CustomTelegramClient) -> typing.ContextManager:
- self.current_client = client
- return self
- async def __aenter__(self):
- await self.lock.acquire()
- async def __aexit__(self, *_):
- self.current_client = None
- self.lock.release()
- import_lock = ImportLock()
- class DragonDb:
- def __init__(self, db: Database):
- self.db = db
- def get(
- self,
- module: str,
- variable: str,
- default: typing.Optional[typing.Any] = None,
- ) -> JSONSerializable:
- return self.db.get(f"dragon.{module}", variable, default)
- def set(self, module: str, variable: str, value: JSONSerializable) -> bool:
- return self.db.set(f"dragon.{module}", variable, value)
- def get_collection(self, module: str) -> typing.Dict[str, JSONSerializable]:
- return dict.get(self.db, f"dragon.{module}", {})
- def remove(self, module: str, variable: str) -> JSONSerializable:
- if f"dragon.{module}" not in self.db:
- return None
- return self.db[f"dragon.{module}"].pop(variable, None)
- def close(self):
- pass
- class DragonDbWrapper:
- def __init__(self, db: DragonDb):
- self.db = db
- class Notifier:
- def __init__(self, modules_help: "ModulesHelpDict"):
- self.modules_help = modules_help
- self.cache = {}
- def __enter__(self):
- self.modules_help.notifier = self
- return self
- def __exit__(self, *_):
- self.modules_help.notifier = None
- def notify(self, key: str, value: dict):
- self.cache[key] = value
- @property
- def modname(self):
- return next(iter(self.cache), "Unknown")
- @property
- def commands(self):
- return {
- key.split()[0]: (
- ((key.split()[1] + " - ") if len(key.split()) > 1 else "") + value
- )
- for key, value in self.cache[self.modname].items()
- }
- class ModulesHelpDict(dict):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.notifier = None
- def append(self, obj: dict):
- # convert help from old to new type
- module_name = list(obj.keys())[0]
- cmds = obj[module_name]
- commands = {}
- for cmd in cmds:
- cmd_name = list(cmd.keys())[0]
- cmd_desc = cmd[cmd_name]
- commands[cmd_name] = cmd_desc
- self[module_name] = commands
- def __setitem__(self, key, value):
- super().__setitem__(key, value)
- if self.notifier:
- self.notifier.notify(key, value)
- def get_notifier(self) -> Notifier:
- return Notifier(self)
- class DragonMisc:
- def __init__(self, client: CustomTelegramClient):
- self.client = client
- self.modules_help = ModulesHelpDict()
- self.requirements_list = []
- self.python_version = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"
- self.gitrepo = git.Repo(
- path=os.path.abspath(os.path.join(os.path.dirname(version.__file__), ".."))
- )
- self.commits_since_tag = 0
- self.userbot_version = version.__version__
- @property
- def prefix(self):
- return self.client.loader.get_prefix("dragon")
- class DragonConfig:
- def __init__(self, client: CustomTelegramClient):
- self.api_id = client.api_id
- self.api_hash = client.api_hash
- self.db_type = ""
- self.db_url = ""
- self.db_name = ""
- self.test_server = False
- class DragonScripts:
- def __init__(self, misc: DragonMisc):
- self.interact_with_to_delete = []
- self.misc = misc
- @staticmethod
- def text(message: types.Message):
- return message.text if message.text else message.caption
- @staticmethod
- def restart():
- restart()
- @staticmethod
- def format_exc(e: Exception, hint: str = None):
- traceback.print_exc()
- if isinstance(e, errors.RPCError):
- return (
- "<b>Telegram API error!</b>\n"
- f"<code>[{e.CODE} {e.ID or e.NAME}] - {e.MESSAGE}</code>"
- )
- hint_text = f"\n\n<b>Hint: {hint}</b>" if hint else ""
- return f"<b>Error!</b>\n<code>{e.__class__.__name__}: {e}</code>" + hint_text
- @staticmethod
- def with_reply(func):
- async def wrapped(client: Client, message: types.Message):
- if not message.reply_to_message:
- await message.edit("<b>Reply to message is required</b>")
- else:
- return await func(client, message)
- return wrapped
- async def interact_with(self, message: types.Message) -> types.Message:
- """
- Check history with bot and return bot's response
- Example:
- .. code-block:: python
- bot_msg = await interact_with(await bot.send_message("@BotFather", "/start"))
- :param message: already sent message to bot
- :return: bot's response
- """
- await asyncio.sleep(1)
- # noinspection PyProtectedMember
- response = await message._client.get_history(message.chat.id, limit=1)
- seconds_waiting = 0
- while response[0].from_user.is_self:
- seconds_waiting += 1
- if seconds_waiting >= 5:
- raise RuntimeError("bot didn't answer in 5 seconds")
- await asyncio.sleep(1)
- # noinspection PyProtectedMember
- response = await message._client.get_history(message.chat.id, limit=1)
- self.interact_with_to_delete.append(message.message_id)
- self.interact_with_to_delete.append(response[0].message_id)
- return response[0]
- def format_module_help(self, module_name: str):
- commands = self.misc.modules_help[module_name]
- help_text = (
- f"{DRAGON_EMOJI} <b>Help for"
- f" </b><code>{module_name}</code>\n\n<b>Usage:</b>\n"
- )
- for command, desc in commands.items():
- cmd = command.split(maxsplit=1)
- args = " <code>" + cmd[1] + "</code>" if len(cmd) > 1 else ""
- help_text += (
- f"<code>{self.misc.prefix}{cmd[0]}</code>{args} — <i>{desc}</i>\n"
- )
- return help_text
- def format_small_module_help(self, module_name: str):
- commands = self.misc.modules_help[module_name]
- help_text = (
- f"{DRAGON_EMOJI }<b>Help for </b><code>{module_name}</code>\n\n<b>Commands"
- " list:</b>\n"
- )
- for command, desc in commands.items():
- cmd = command.split(maxsplit=1)
- args = " <code>" + cmd[1] + "</code>" if len(cmd) > 1 else ""
- help_text += f"<code>{self.misc.prefix}{cmd[0]}</code>{args}\n"
- help_text += (
- f"\n<b>Get full usage:</b> <code>{self.misc.prefix}help"
- f" {module_name}</code>"
- )
- return help_text
- def import_library(
- self,
- library_name: str,
- package_name: typing.Optional[str] = None,
- ):
- """
- Loads a library, or installs it in ImportError case
- :param library_name: library name (import example...)
- :param package_name: package name in PyPi (pip install example)
- :return: loaded module
- """
- if package_name is None:
- package_name = library_name
- self.misc.requirements_list.append(package_name)
- try:
- return importlib.import_module(library_name)
- except ImportError:
- completed = subprocess.run(
- [
- sys.executable,
- "-m",
- "pip",
- "install",
- "--upgrade",
- "-q",
- "--disable-pip-version-check",
- "--no-warn-script-location",
- *(
- ["--user"]
- if "PIP_TARGET" not in os.environ
- and "VIRTUAL_ENV" not in os.environ
- else []
- ),
- package_name,
- ],
- check=False,
- )
- if completed.returncode != 0:
- raise RuntimeError(
- f"Failed to install library {package_name} (pip exited with code"
- f" {completed.returncode})"
- )
- return importlib.import_module(library_name)
- @staticmethod
- def resize_image(
- input_img: typing.Union[bytes, io.BytesIO],
- output: typing.Optional[io.BytesIO] = None,
- img_type: str = "PNG",
- ) -> io.BytesIO:
- if not PIP_AVAILABLE:
- raise RuntimeError("Install Pillow with: pip install Pillow -U")
- if output is None:
- output = BytesIO()
- output.name = f"sticker.{img_type.lower()}"
- with Image.open(input_img) as img:
- # We used to use thumbnail(size) here, but it returns with a *max* dimension of 512,512
- # rather than making one side exactly 512 so we have to calculate dimensions manually :(
- if img.width == img.height:
- size = (512, 512)
- elif img.width < img.height:
- size = (max(512 * img.width // img.height, 1), 512)
- else:
- size = (512, max(512 * img.height // img.width, 1))
- img.resize(size).save(output, img_type)
- return output
- class DragonCompat:
- def __init__(self, client: CustomTelegramClient):
- self.client = client
- self.db = DragonDbWrapper(DragonDb(client.loader.db))
- self.misc = DragonMisc(client)
- self.scripts = DragonScripts(self.misc)
- self.config = DragonConfig(client)
- def patched_import(name: str, *args, **kwargs):
- caller = inspect.currentframe().f_back
- caller_name = caller.f_globals.get("__name__")
- if name.startswith("utils") and caller_name.startswith("dragon"):
- if not import_lock.current_client:
- raise RuntimeError("Dragon client not set")
- if name.split(".", maxsplit=1)[1] in {"db", "misc", "scripts", "config"}:
- return getattr(
- import_lock.current_client.dragon_compat,
- name.split(".", maxsplit=1)[1],
- )
- raise ImportError(f"Unknown module {name}")
- return native_import(name, *args, **kwargs)
- builtins.__import__ = patched_import
- def apply_compat(client: CustomTelegramClient):
- client.dragon_compat = DragonCompat(client)
|