123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- # ©️ Dan Gazizullin, 2021-2023
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import inspect
- import logging
- import os
- import random
- import time
- import typing
- from io import BytesIO
- from hikkatl.tl.types import Message
- from .. import loader, main, utils
- from ..inline.types import InlineCall
- logger = logging.getLogger(__name__)
- DEBUG_MODS_DIR = os.path.join(utils.get_base_dir(), "debug_modules")
- if not os.path.isdir(DEBUG_MODS_DIR):
- os.mkdir(DEBUG_MODS_DIR, mode=0o755)
- for mod in os.scandir(DEBUG_MODS_DIR):
- os.remove(mod.path)
- @loader.tds
- class TestMod(loader.Module):
- """Perform operations based on userbot self-testing"""
- strings = {"name": "Tester"}
- def __init__(self):
- self._memory = {}
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "force_send_all",
- False,
- (
- "⚠️ Do not touch, if you don't know what it does!\nBy default,"
- " Hikka will try to determine, which client caused logs. E.g. there"
- " is a module TestModule installed on Client1 and TestModule2 on"
- " Client2. By default, Client2 will get logs from TestModule2, and"
- " Client1 will get logs from TestModule. If this option is enabled,"
- " Hikka will send all logs to Client1 and Client2, even if it is"
- " not the one that caused the log."
- ),
- validator=loader.validators.Boolean(),
- on_change=self._pass_config_to_logger,
- ),
- loader.ConfigValue(
- "tglog_level",
- "INFO",
- (
- "⚠️ Do not touch, if you don't know what it does!\n"
- "Minimal loglevel for records to be sent in Telegram."
- ),
- validator=loader.validators.Choice(
- ["INFO", "WARNING", "ERROR", "CRITICAL"]
- ),
- on_change=self._pass_config_to_logger,
- ),
- loader.ConfigValue(
- "ignore_common",
- True,
- "Ignore common errors (e.g. 'TypeError' in telethon)",
- validator=loader.validators.Boolean(),
- on_change=self._pass_config_to_logger,
- ),
- )
- def _pass_config_to_logger(self):
- logging.getLogger().handlers[0].force_send_all = self.config["force_send_all"]
- logging.getLogger().handlers[0].tg_level = {
- "INFO": 20,
- "WARNING": 30,
- "ERROR": 40,
- "CRITICAL": 50,
- }[self.config["tglog_level"]]
- logging.getLogger().handlers[0].ignore_common = self.config["ignore_common"]
- @loader.command()
- async def dump(self, message: Message):
- if not message.is_reply:
- return
- await utils.answer(
- message,
- "<code>"
- + utils.escape_html((await message.get_reply_message()).stringify())
- + "</code>",
- )
- @loader.command()
- async def clearlogs(self, message: Message):
- for handler in logging.getLogger().handlers:
- handler.buffer = []
- handler.handledbuffer = []
- handler.tg_buff = ""
- await utils.answer(message, self.strings("logs_cleared"))
- @loader.loop(interval=1, autostart=True)
- async def watchdog(self):
- if not os.path.isdir(DEBUG_MODS_DIR):
- return
- try:
- for module in os.scandir(DEBUG_MODS_DIR):
- last_modified = os.stat(module.path).st_mtime
- cls_ = module.path.split("/")[-1].split(".py")[0]
- if cls_ not in self._memory:
- self._memory[cls_] = last_modified
- continue
- if self._memory[cls_] == last_modified:
- continue
- self._memory[cls_] = last_modified
- logger.debug("Reloading debug module %s", cls_)
- with open(module.path, "r") as f:
- try:
- await next(
- module
- for module in self.allmodules.modules
- if module.__class__.__name__ == "LoaderMod"
- ).load_module(
- f.read(),
- None,
- save_fs=False,
- )
- except Exception:
- logger.exception("Failed to reload module in watchdog")
- except Exception:
- logger.exception("Failed debugging watchdog")
- return
- @loader.command()
- async def debugmod(self, message: Message):
- args = utils.get_args_raw(message)
- instance = None
- for module in self.allmodules.modules:
- if (
- module.__class__.__name__.lower() == args.lower()
- or module.strings["name"].lower() == args.lower()
- ):
- if os.path.isfile(
- os.path.join(
- DEBUG_MODS_DIR,
- f"{module.__class__.__name__}.py",
- )
- ):
- os.remove(
- os.path.join(
- DEBUG_MODS_DIR,
- f"{module.__class__.__name__}.py",
- )
- )
- try:
- delattr(module, "hikka_debug")
- except AttributeError:
- pass
- await utils.answer(message, self.strings("debugging_disabled"))
- return
- module.hikka_debug = True
- instance = module
- break
- if not instance:
- await utils.answer(message, self.strings("bad_module"))
- return
- with open(
- os.path.join(
- DEBUG_MODS_DIR,
- f"{instance.__class__.__name__}.py",
- ),
- "wb",
- ) as f:
- f.write(inspect.getmodule(instance).__loader__.data)
- await utils.answer(
- message,
- self.strings("debugging_enabled").format(instance.__class__.__name__),
- )
- @loader.command()
- async def logs(
- self,
- message: typing.Union[Message, InlineCall],
- force: bool = False,
- lvl: typing.Union[int, None] = None,
- ):
- if not isinstance(lvl, int):
- args = utils.get_args_raw(message)
- try:
- try:
- lvl = int(args.split()[0])
- except ValueError:
- lvl = getattr(logging, args.split()[0].upper(), None)
- except IndexError:
- lvl = None
- if not isinstance(lvl, int):
- try:
- if not self.inline.init_complete or not await self.inline.form(
- text=self.strings("choose_loglevel"),
- reply_markup=utils.chunks(
- [
- {
- "text": name,
- "callback": self.logs,
- "args": (False, level),
- }
- for name, level in [
- ("🚫 Error", 40),
- ("⚠️ Warning", 30),
- ("ℹ️ Info", 20),
- ("🧑💻 All", 0),
- ]
- ],
- 2,
- )
- + [[{"text": self.strings("cancel"), "action": "close"}]],
- message=message,
- ):
- raise
- except Exception:
- await utils.answer(message, self.strings("set_loglevel"))
- return
- logs = "\n\n".join(
- [
- "\n".join(
- handler.dumps(lvl, client_id=self._client.tg_id)
- if "client_id" in inspect.signature(handler.dumps).parameters
- else handler.dumps(lvl)
- )
- for handler in logging.getLogger().handlers
- ]
- )
- named_lvl = (
- lvl
- if lvl not in logging._levelToName
- else logging._levelToName[lvl] # skipcq: PYL-W0212
- )
- if (
- lvl < logging.WARNING
- and not force
- and (
- not isinstance(message, Message)
- or "force_insecure" not in message.raw_text.lower()
- )
- ):
- try:
- if not self.inline.init_complete:
- raise
- cfg = {
- "text": self.strings("confidential").format(named_lvl),
- "reply_markup": [
- {
- "text": self.strings("send_anyway"),
- "callback": self.logs,
- "args": [True, lvl],
- },
- {"text": self.strings("cancel"), "action": "close"},
- ],
- }
- if isinstance(message, Message):
- if not await self.inline.form(**cfg, message=message):
- raise
- else:
- await message.edit(**cfg)
- except Exception:
- await utils.answer(
- message,
- self.strings("confidential_text").format(named_lvl),
- )
- return
- if len(logs) <= 2:
- if isinstance(message, Message):
- await utils.answer(message, self.strings("no_logs").format(named_lvl))
- else:
- await message.edit(self.strings("no_logs").format(named_lvl))
- await message.unload()
- return
- logs = self.lookup("evaluator").censor(logs)
- logs = BytesIO(logs.encode("utf-16"))
- logs.name = "hikka-logs.txt"
- ghash = utils.get_git_hash()
- other = (
- *main.__version__,
- (
- " <a"
- f' href="https://github.com/hikariatama/Hikka/commit/{ghash}">@{ghash[:8]}</a>'
- if ghash
- else ""
- ),
- )
- if getattr(message, "out", True):
- await message.delete()
- if isinstance(message, Message):
- await utils.answer(
- message,
- logs,
- caption=self.strings("logs_caption").format(named_lvl, *other),
- )
- else:
- await self._client.send_file(
- message.form["chat"],
- logs,
- caption=self.strings("logs_caption").format(named_lvl, *other),
- reply_to=message.form["top_msg_id"],
- )
- @loader.command()
- async def suspend(self, message: Message):
- try:
- time_sleep = float(utils.get_args_raw(message))
- await utils.answer(
- message,
- self.strings("suspended").format(time_sleep),
- )
- time.sleep(time_sleep)
- except ValueError:
- await utils.answer(message, self.strings("suspend_invalid_time"))
- @loader.command()
- async def ping(self, message: Message):
- start = time.perf_counter_ns()
- message = await utils.answer(message, "🌘")
- await utils.answer(
- message,
- self.strings("results_ping").format(
- round((time.perf_counter_ns() - start) / 10**6, 3),
- utils.formatted_uptime(),
- )
- + (
- ("\n\n" + self.strings("ping_hint"))
- if random.choice([0, 0, 1]) == 1
- else ""
- ),
- )
- async def client_ready(self):
- chat, _ = await utils.asset_channel(
- self._client,
- "hikka-logs",
- "🌘 Your Hikka logs will appear in this chat",
- silent=True,
- invite_bot=True,
- avatar="https://github.com/hikariatama/assets/raw/master/hikka-logs.png",
- )
- self.logchat = int(f"-100{chat.id}")
- logging.getLogger().handlers[0].install_tg_log(self)
- logger.debug("Bot logging installed for %s", self.logchat)
- self._pass_config_to_logger()
|