123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946 |
- # ©️ Dan Gazizullin, 2021-2023
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import logging
- import os
- import random
- import hikkatl
- from hikkatl.tl.functions.channels import JoinChannelRequest
- from hikkatl.tl.functions.messages import (
- GetDialogFiltersRequest,
- UpdateDialogFilterRequest,
- )
- from hikkatl.tl.types import Message
- from hikkatl.utils import get_display_name
- from .. import loader, log, main, utils
- from .._internal import fw_protect, restart
- from ..inline.types import InlineCall
- from ..web import core
- logger = logging.getLogger(__name__)
- ALL_INVOKES = [
- "flush_entity_cache",
- "flush_fulluser_cache",
- "flush_fullchannel_cache",
- "flush_perms_cache",
- "flush_loader_cache",
- "flush_cache",
- "reload_core",
- "inspect_cache",
- "inspect_modules",
- ]
- @loader.tds
- class HikkaSettingsMod(loader.Module):
- """Advanced settings for Hikka Userbot"""
- strings = {"name": "HikkaSettings"}
- def get_watchers(self) -> tuple:
- return [
- str(watcher.__self__.__class__.strings["name"])
- for watcher in self.allmodules.watchers
- if watcher.__self__.__class__.strings is not None
- ], self._db.get(main.__name__, "disabled_watchers", {})
- async def _uninstall(self, call: InlineCall):
- await call.edit(self.strings("uninstall"))
- async with self._client.conversation("@BotFather") as conv:
- for msg in [
- "/deletebot",
- f"@{self.inline.bot_username}",
- "Yes, I am totally sure.",
- ]:
- await fw_protect()
- m = await conv.send_message(msg)
- r = await conv.get_response()
- logger.debug(">> %s", m.raw_text)
- logger.debug("<< %s", r.raw_text)
- await fw_protect()
- await m.delete()
- await r.delete()
- async for dialog in self._client.iter_dialogs(
- None,
- ignore_migrated=True,
- ):
- if (
- dialog.name
- in {
- "hikka-logs",
- "hikka-onload",
- "hikka-assets",
- "hikka-backups",
- "hikka-acc-switcher",
- "silent-tags",
- }
- and dialog.is_channel
- and (
- dialog.entity.participants_count == 1
- or dialog.entity.participants_count == 2
- and dialog.name in {"hikka-logs", "silent-tags"}
- )
- or (
- self._client.loader.inline.init_complete
- and dialog.entity.id == self._client.loader.inline.bot_id
- )
- ):
- await fw_protect()
- await self._client.delete_dialog(dialog.entity)
- await fw_protect()
- folders = await self._client(GetDialogFiltersRequest())
- if any(folder.title == "hikka" for folder in folders):
- folder_id = max(
- folders,
- key=lambda x: x.id,
- ).id
- await fw_protect()
- await self._client(UpdateDialogFilterRequest(id=folder_id))
- for handler in logging.getLogger().handlers:
- handler.setLevel(logging.CRITICAL)
- await fw_protect()
- await self._client.log_out()
- restart()
- async def _uninstall_confirm_step_2(self, call: InlineCall):
- await call.edit(
- self.strings("deauth_confirm_step2"),
- utils.chunks(
- list(
- sorted(
- [
- {
- "text": self.strings("deauth_yes"),
- "callback": self._uninstall,
- },
- *[
- {
- "text": self.strings(f"deauth_no_{i}"),
- "action": "close",
- }
- for i in range(1, 4)
- ],
- ],
- key=lambda _: random.random(),
- )
- ),
- 2,
- )
- + [
- [
- {
- "text": self.strings("deauth_cancel"),
- "action": "close",
- }
- ]
- ],
- )
- @loader.command()
- async def uninstall_hikka(self, message: Message):
- await self.inline.form(
- self.strings("deauth_confirm"),
- message,
- [
- {
- "text": self.strings("deauth_confirm_btn"),
- "callback": self._uninstall_confirm_step_2,
- },
- {"text": self.strings("deauth_cancel"), "action": "close"},
- ],
- )
- @loader.command()
- async def watchers(self, message: Message):
- watchers, disabled_watchers = self.get_watchers()
- watchers = [
- f"♻️ {watcher}"
- for watcher in watchers
- if watcher not in list(disabled_watchers.keys())
- ]
- watchers += [f"💢 {k} {v}" for k, v in disabled_watchers.items()]
- await utils.answer(
- message, self.strings("watchers").format("\n".join(watchers))
- )
- @loader.command()
- async def watcherbl(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("args"))
- return
- watchers, disabled_watchers = self.get_watchers()
- if args.lower() not in map(lambda x: x.lower(), watchers):
- await utils.answer(message, self.strings("mod404").format(args))
- return
- args = next((x.lower() == args.lower() for x in watchers), False)
- current_bl = [
- v for k, v in disabled_watchers.items() if k.lower() == args.lower()
- ]
- current_bl = current_bl[0] if current_bl else []
- chat = utils.get_chat_id(message)
- if chat not in current_bl:
- if args in disabled_watchers:
- for k in disabled_watchers:
- if k.lower() == args.lower():
- disabled_watchers[k].append(chat)
- break
- else:
- disabled_watchers[args] = [chat]
- await utils.answer(
- message,
- self.strings("disabled").format(args) + " <b>in current chat</b>",
- )
- else:
- for k in disabled_watchers.copy():
- if k.lower() == args.lower():
- disabled_watchers[k].remove(chat)
- if not disabled_watchers[k]:
- del disabled_watchers[k]
- break
- await utils.answer(
- message,
- self.strings("enabled").format(args) + " <b>in current chat</b>",
- )
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- @loader.command()
- async def watchercmd(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- return await utils.answer(message, self.strings("args"))
- chats, pm, out, incoming = False, False, False, False
- if "-c" in args:
- args = args.replace("-c", "").replace(" ", " ").strip()
- chats = True
- if "-p" in args:
- args = args.replace("-p", "").replace(" ", " ").strip()
- pm = True
- if "-o" in args:
- args = args.replace("-o", "").replace(" ", " ").strip()
- out = True
- if "-i" in args:
- args = args.replace("-i", "").replace(" ", " ").strip()
- incoming = True
- if chats and pm:
- pm = False
- if out and incoming:
- incoming = False
- watchers, disabled_watchers = self.get_watchers()
- if args.lower() not in [watcher.lower() for watcher in watchers]:
- return await utils.answer(message, self.strings("mod404").format(args))
- args = [watcher for watcher in watchers if watcher.lower() == args.lower()][0]
- if chats or pm or out or incoming:
- disabled_watchers[args] = [
- *(["only_chats"] if chats else []),
- *(["only_pm"] if pm else []),
- *(["out"] if out else []),
- *(["in"] if incoming else []),
- ]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- await utils.answer(
- message,
- self.strings("enabled").format(args)
- + f" (<code>{disabled_watchers[args]}</code>)",
- )
- return
- if args in disabled_watchers and "*" in disabled_watchers[args]:
- await utils.answer(message, self.strings("enabled").format(args))
- del disabled_watchers[args]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- return
- disabled_watchers[args] = ["*"]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- await utils.answer(message, self.strings("disabled").format(args))
- @loader.command()
- async def nonickuser(self, message: Message):
- if not (reply := await message.get_reply_message()):
- await utils.answer(message, self.strings("reply_required"))
- return
- u = reply.sender_id
- if not isinstance(u, int):
- u = u.user_id
- nn = self._db.get(main.__name__, "nonickusers", [])
- if u not in nn:
- nn += [u]
- nn = list(set(nn)) # skipcq: PTC-W0018
- await utils.answer(message, self.strings("user_nn").format("on"))
- else:
- nn = list(set(nn) - {u})
- await utils.answer(message, self.strings("user_nn").format("off"))
- self._db.set(main.__name__, "nonickusers", nn)
- @loader.command()
- async def nonickchat(self, message: Message):
- if message.is_private:
- await utils.answer(message, self.strings("private_not_allowed"))
- return
- chat = utils.get_chat_id(message)
- nn = self._db.get(main.__name__, "nonickchats", [])
- if chat not in nn:
- nn += [chat]
- nn = list(set(nn)) # skipcq: PTC-W0018
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html((await message.get_chat()).title),
- "on",
- ),
- )
- else:
- nn = list(set(nn) - {chat})
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html((await message.get_chat()).title),
- "off",
- ),
- )
- self._db.set(main.__name__, "nonickchats", nn)
- @loader.command()
- async def nonickcmdcmd(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("no_cmd"))
- return
- if args not in self.allmodules.commands:
- await utils.answer(message, self.strings("cmd404"))
- return
- nn = self._db.get(main.__name__, "nonickcmds", [])
- if args not in nn:
- nn += [args]
- nn = list(set(nn))
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html(self.get_prefix() + args),
- "on",
- ),
- )
- else:
- nn = list(set(nn) - {args})
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html(self.get_prefix() + args),
- "off",
- ),
- )
- self._db.set(main.__name__, "nonickcmds", nn)
- @loader.command()
- async def nonickcmds(self, message: Message):
- if not self._db.get(main.__name__, "nonickcmds", []):
- await utils.answer(message, self.strings("nothing"))
- return
- await utils.answer(
- message,
- self.strings("cmd_nn_list").format(
- "\n".join(
- [
- f"▫️ <code>{utils.escape_html(self.get_prefix() + cmd)}</code>"
- for cmd in self._db.get(main.__name__, "nonickcmds", [])
- ]
- )
- ),
- )
- @loader.command()
- async def nonickusers(self, message: Message):
- users = []
- for user_id in self._db.get(main.__name__, "nonickusers", []).copy():
- try:
- user = await self._client.get_entity(user_id)
- except Exception:
- self._db.set(
- main.__name__,
- "nonickusers",
- list(
- (
- set(self._db.get(main.__name__, "nonickusers", []))
- - {user_id}
- )
- ),
- )
- logger.warning("User %s removed from nonickusers list", user_id)
- continue
- users += [
- '▫️ <b><a href="tg://user?id={}">{}</a></b>'.format(
- user_id,
- utils.escape_html(get_display_name(user)),
- )
- ]
- if not users:
- await utils.answer(message, self.strings("nothing"))
- return
- await utils.answer(
- message,
- self.strings("user_nn_list").format("\n".join(users)),
- )
- @loader.command()
- async def nonickchats(self, message: Message):
- chats = []
- for chat in self._db.get(main.__name__, "nonickchats", []):
- try:
- chat_entity = await self._client.get_entity(int(chat))
- except Exception:
- self._db.set(
- main.__name__,
- "nonickchats",
- list(
- (set(self._db.get(main.__name__, "nonickchats", [])) - {chat})
- ),
- )
- logger.warning("Chat %s removed from nonickchats list", chat)
- continue
- chats += [
- '▫️ <b><a href="{}">{}</a></b>'.format(
- utils.get_entity_url(chat_entity),
- utils.escape_html(get_display_name(chat_entity)),
- )
- ]
- if not chats:
- await utils.answer(message, self.strings("nothing"))
- return
- await utils.answer(
- message,
- self.strings("user_nn_list").format("\n".join(chats)),
- )
- async def inline__setting(self, call: InlineCall, key: str, state: bool = False):
- if callable(key):
- key()
- hikkatl.extensions.html.CUSTOM_EMOJIS = not main.get_config_key(
- "disable_custom_emojis"
- )
- else:
- self._db.set(main.__name__, key, state)
- if key == "no_nickname" and state and self.get_prefix() == ".":
- await call.answer(
- self.strings("nonick_warning"),
- show_alert=True,
- )
- else:
- await call.answer("Configuration value saved!")
- await call.edit(
- self.strings("inline_settings"),
- reply_markup=self._get_settings_markup(),
- )
- async def inline__update(
- self,
- call: InlineCall,
- confirm_required: bool = False,
- ):
- if confirm_required:
- await call.edit(
- self.strings("confirm_update"),
- reply_markup=[
- {"text": "🪂 Update", "callback": self.inline__update},
- {"text": "🚫 Cancel", "action": "close"},
- ],
- )
- return
- await call.answer("You userbot is being updated...", show_alert=True)
- await call.delete()
- await self.invoke("update", "-f", peer="me")
- async def _remove_core_protection(self, call: InlineCall):
- self._db.set(main.__name__, "remove_core_protection", True)
- await call.edit(self.strings("core_protection_removed"))
- @loader.command()
- async def remove_core_protection(self, message: Message):
- if self._db.get(main.__name__, "remove_core_protection", False):
- await utils.answer(message, self.strings("core_protection_already_removed"))
- return
- await self.inline.form(
- message=message,
- text=self.strings("core_protection_confirm"),
- reply_markup=[
- {
- "text": self.strings("core_protection_btn"),
- "callback": self._remove_core_protection,
- },
- {
- "text": self.strings("btn_no"),
- "action": "close",
- },
- ],
- )
- async def inline__restart(
- self,
- call: InlineCall,
- confirm_required: bool = False,
- ):
- if confirm_required:
- await call.edit(
- self.strings("confirm_restart"),
- reply_markup=[
- {"text": "🔄 Restart", "callback": self.inline__restart},
- {"text": "🚫 Cancel", "action": "close"},
- ],
- )
- return
- await call.answer("You userbot is being restarted...", show_alert=True)
- await call.delete()
- await self.invoke("restart", "-f", peer="me")
- def _get_settings_markup(self) -> list:
- return [
- [
- (
- {
- "text": "✅ NoNick",
- "callback": self.inline__setting,
- "args": (
- "no_nickname",
- False,
- ),
- }
- if self._db.get(main.__name__, "no_nickname", False)
- else {
- "text": "🚫 NoNick",
- "callback": self.inline__setting,
- "args": (
- "no_nickname",
- True,
- ),
- }
- ),
- (
- {
- "text": "✅ Grep",
- "callback": self.inline__setting,
- "args": (
- "grep",
- False,
- ),
- }
- if self._db.get(main.__name__, "grep", False)
- else {
- "text": "🚫 Grep",
- "callback": self.inline__setting,
- "args": (
- "grep",
- True,
- ),
- }
- ),
- (
- {
- "text": "✅ InlineLogs",
- "callback": self.inline__setting,
- "args": (
- "inlinelogs",
- False,
- ),
- }
- if self._db.get(main.__name__, "inlinelogs", True)
- else {
- "text": "🚫 InlineLogs",
- "callback": self.inline__setting,
- "args": (
- "inlinelogs",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("do_not_suggest_fs"),
- "callback": self.inline__setting,
- "args": (
- "disable_modules_fs",
- False,
- ),
- }
- if self._db.get(main.__name__, "disable_modules_fs", False)
- else {
- "text": self.strings("suggest_fs"),
- "callback": self.inline__setting,
- "args": (
- "disable_modules_fs",
- True,
- ),
- }
- )
- ],
- [
- (
- {
- "text": self.strings("use_fs"),
- "callback": self.inline__setting,
- "args": (
- "permanent_modules_fs",
- False,
- ),
- }
- if self._db.get(main.__name__, "permanent_modules_fs", False)
- else {
- "text": self.strings("do_not_use_fs"),
- "callback": self.inline__setting,
- "args": (
- "permanent_modules_fs",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("suggest_subscribe"),
- "callback": self.inline__setting,
- "args": (
- "suggest_subscribe",
- False,
- ),
- }
- if self._db.get(main.__name__, "suggest_subscribe", True)
- else {
- "text": self.strings("do_not_suggest_subscribe"),
- "callback": self.inline__setting,
- "args": (
- "suggest_subscribe",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("no_custom_emojis"),
- "callback": self.inline__setting,
- "args": (
- lambda: main.save_config_key(
- "disable_custom_emojis", False
- ),
- ),
- }
- if main.get_config_key("disable_custom_emojis")
- else {
- "text": self.strings("custom_emojis"),
- "callback": self.inline__setting,
- "args": (
- lambda: main.save_config_key("disable_custom_emojis", True),
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("disable_debugger"),
- "callback": self.inline__setting,
- "args": lambda: self._db.set(log.__name__, "debugger", False),
- }
- if self._db.get(log.__name__, "debugger", False)
- else {
- "text": self.strings("enable_debugger"),
- "callback": self.inline__setting,
- "args": (lambda: self._db.set(log.__name__, "debugger", True),),
- }
- ),
- ],
- [
- {
- "text": self.strings("btn_restart"),
- "callback": self.inline__restart,
- "args": (True,),
- },
- {
- "text": self.strings("btn_update"),
- "callback": self.inline__update,
- "args": (True,),
- },
- ],
- [{"text": self.strings("close_menu"), "action": "close"}],
- ]
- @loader.command()
- async def settings(self, message: Message):
- await self.inline.form(
- self.strings("inline_settings"),
- message=message,
- reply_markup=self._get_settings_markup(),
- )
- @loader.command()
- async def weburl(self, message: Message, force: bool = False):
- if "LAVHOST" in os.environ:
- form = await self.inline.form(
- self.strings("lavhost_web"),
- message=message,
- reply_markup={
- "text": self.strings("web_btn"),
- "url": await main.hikka.web.get_url(proxy_pass=False),
- },
- gif="https://t.me/hikari_assets/28",
- )
- return
- if (
- not force
- and not message.is_private
- and "force_insecure" not in message.raw_text.lower()
- ):
- try:
- if not await self.inline.form(
- self.strings("privacy_leak_nowarn").format(self._client.tg_id),
- message=message,
- reply_markup=[
- {
- "text": self.strings("btn_yes"),
- "callback": self.weburl,
- "args": (True,),
- },
- {"text": self.strings("btn_no"), "action": "close"},
- ],
- gif="https://i.gifer.com/embedded/download/Z5tS.gif",
- ):
- raise Exception
- except Exception:
- await utils.answer(
- message,
- self.strings("privacy_leak").format(
- self._client.tg_id,
- utils.escape_html(self.get_prefix()),
- ),
- )
- return
- if not main.hikka.web:
- main.hikka.web = core.Web(
- data_root=main.BASE_DIR,
- api_token=main.hikka.api_token,
- proxy=main.hikka.proxy,
- connection=main.hikka.conn,
- )
- await main.hikka.web.add_loader(self._client, self.allmodules, self._db)
- await main.hikka.web.start_if_ready(
- len(self.allclients),
- main.hikka.arguments.port,
- proxy_pass=main.hikka.arguments.proxy_pass,
- )
- if force:
- form = message
- await form.edit(
- self.strings("opening_tunnel"),
- reply_markup={"text": "🕔 Wait...", "data": "empty"},
- gif=(
- "https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
- ),
- )
- else:
- form = await self.inline.form(
- self.strings("opening_tunnel"),
- message=message,
- reply_markup={"text": "🕔 Wait...", "data": "empty"},
- gif=(
- "https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
- ),
- )
- url = await main.hikka.web.get_url(proxy_pass=True)
- await form.edit(
- self.strings("tunnel_opened"),
- reply_markup={"text": self.strings("web_btn"), "url": url},
- gif="https://t.me/hikari_assets/48",
- )
- @loader.loop(interval=1, autostart=True)
- async def loop(self):
- if not (obj := self.allmodules.get_approved_channel):
- return
- channel, event = obj
- try:
- await self._client(JoinChannelRequest(channel))
- except Exception:
- logger.exception("Failed to join channel")
- event.status = False
- event.set()
- else:
- event.status = True
- event.set()
- def _get_all_IDM(self, module: str):
- return {
- getattr(getattr(self.lookup(module), name), "name", name): getattr(
- self.lookup(module), name
- )
- for name in dir(self.lookup(module))
- if getattr(getattr(self.lookup(module), name), "is_debug_method", False)
- }
- @loader.command()
- async def invokecmd(self, message: Message):
- if not (args := utils.get_args_raw(message)) or len(args.split()) < 2:
- await utils.answer(message, self.strings("no_args"))
- return
- module = args.split()[0]
- method = args.split(maxsplit=1)[1]
- if module != "core" and not self.lookup(module):
- await utils.answer(message, self.strings("module404").format(module))
- return
- if (
- module == "core"
- and method not in ALL_INVOKES
- or module != "core"
- and method not in self._get_all_IDM(module)
- ):
- await utils.answer(message, self.strings("invoke404").format(method))
- return
- message = await utils.answer(
- message, self.strings("invoking").format(method, module)
- )
- result = ""
- if module == "core":
- if method == "flush_entity_cache":
- result = (
- f"Dropped {len(self._client._hikka_entity_cache)} cache records"
- )
- self._client._hikka_entity_cache = {}
- elif method == "flush_fulluser_cache":
- result = (
- f"Dropped {len(self._client._hikka_fulluser_cache)} cache records"
- )
- self._client._hikka_fulluser_cache = {}
- elif method == "flush_fullchannel_cache":
- result = (
- f"Dropped {len(self._client._hikka_fullchannel_cache)} cache"
- " records"
- )
- self._client._hikka_fullchannel_cache = {}
- elif method == "flush_perms_cache":
- result = f"Dropped {len(self._client._hikka_perms_cache)} cache records"
- self._client._hikka_perms_cache = {}
- elif method == "flush_loader_cache":
- result = (
- f"Dropped {await self.lookup('loader').flush_cache()} cache records"
- )
- elif method == "flush_cache":
- count = self.lookup("loader").flush_cache()
- result = (
- f"Dropped {len(self._client._hikka_entity_cache)} entity cache"
- " records\nDropped"
- f" {len(self._client._hikka_fulluser_cache)} fulluser cache"
- " records\nDropped"
- f" {len(self._client._hikka_fullchannel_cache)} fullchannel cache"
- " records\nDropped"
- f" {count} loader links cache records"
- )
- self._client._hikka_entity_cache = {}
- self._client._hikka_fulluser_cache = {}
- self._client._hikka_fullchannel_cache = {}
- self._client.hikka_me = await self._client.get_me()
- elif method == "reload_core":
- core_quantity = await self.lookup("loader").reload_core()
- result = f"Reloaded {core_quantity} core modules"
- elif method == "inspect_cache":
- result = (
- "Entity cache:"
- f" {len(self._client._hikka_entity_cache)} records\nFulluser cache:"
- f" {len(self._client._hikka_fulluser_cache)} records\nFullchannel"
- " cache:"
- f" {len(self._client._hikka_fullchannel_cache)} records\nLoader"
- f" links cache: {self.lookup('loader').inspect_cache()} records"
- )
- elif method == "inspect_modules":
- result = (
- "Loaded modules: {}\nLoaded core modules: {}\nLoaded user"
- " modules: {}"
- ).format(
- len(self.allmodules.modules),
- sum(
- module.__origin__.startswith("<core")
- for module in self.allmodules.modules
- ),
- sum(
- not module.__origin__.startswith("<core")
- for module in self.allmodules.modules
- ),
- )
- else:
- result = await self._get_all_IDM(module)[method](message)
- await utils.answer(
- message,
- self.strings("invoke").format(method, utils.escape_html(result)),
- )
|