test.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import inspect
  7. import logging
  8. import os
  9. import random
  10. import time
  11. import typing
  12. from io import BytesIO
  13. from hikkatl.tl.types import Message
  14. from .. import loader, main, utils
  15. from ..inline.types import InlineCall
  16. logger = logging.getLogger(__name__)
  17. DEBUG_MODS_DIR = os.path.join(utils.get_base_dir(), "debug_modules")
  18. if not os.path.isdir(DEBUG_MODS_DIR):
  19. os.mkdir(DEBUG_MODS_DIR, mode=0o755)
  20. for mod in os.scandir(DEBUG_MODS_DIR):
  21. os.remove(mod.path)
  22. @loader.tds
  23. class TestMod(loader.Module):
  24. """Perform operations based on userbot self-testing"""
  25. strings = {"name": "Tester"}
  26. def __init__(self):
  27. self._memory = {}
  28. self.config = loader.ModuleConfig(
  29. loader.ConfigValue(
  30. "force_send_all",
  31. False,
  32. (
  33. "⚠️ Do not touch, if you don't know what it does!\nBy default,"
  34. " Hikka will try to determine, which client caused logs. E.g. there"
  35. " is a module TestModule installed on Client1 and TestModule2 on"
  36. " Client2. By default, Client2 will get logs from TestModule2, and"
  37. " Client1 will get logs from TestModule. If this option is enabled,"
  38. " Hikka will send all logs to Client1 and Client2, even if it is"
  39. " not the one that caused the log."
  40. ),
  41. validator=loader.validators.Boolean(),
  42. on_change=self._pass_config_to_logger,
  43. ),
  44. loader.ConfigValue(
  45. "tglog_level",
  46. "INFO",
  47. (
  48. "⚠️ Do not touch, if you don't know what it does!\n"
  49. "Minimal loglevel for records to be sent in Telegram."
  50. ),
  51. validator=loader.validators.Choice(
  52. ["INFO", "WARNING", "ERROR", "CRITICAL"]
  53. ),
  54. on_change=self._pass_config_to_logger,
  55. ),
  56. loader.ConfigValue(
  57. "ignore_common",
  58. True,
  59. "Ignore common errors (e.g. 'TypeError' in telethon)",
  60. validator=loader.validators.Boolean(),
  61. on_change=self._pass_config_to_logger,
  62. ),
  63. )
  64. def _pass_config_to_logger(self):
  65. logging.getLogger().handlers[0].force_send_all = self.config["force_send_all"]
  66. logging.getLogger().handlers[0].tg_level = {
  67. "INFO": 20,
  68. "WARNING": 30,
  69. "ERROR": 40,
  70. "CRITICAL": 50,
  71. }[self.config["tglog_level"]]
  72. logging.getLogger().handlers[0].ignore_common = self.config["ignore_common"]
  73. @loader.command()
  74. async def dump(self, message: Message):
  75. if not message.is_reply:
  76. return
  77. await utils.answer(
  78. message,
  79. "<code>"
  80. + utils.escape_html((await message.get_reply_message()).stringify())
  81. + "</code>",
  82. )
  83. @loader.command()
  84. async def clearlogs(self, message: Message):
  85. for handler in logging.getLogger().handlers:
  86. handler.buffer = []
  87. handler.handledbuffer = []
  88. handler.tg_buff = ""
  89. await utils.answer(message, self.strings("logs_cleared"))
  90. @loader.loop(interval=1, autostart=True)
  91. async def watchdog(self):
  92. if not os.path.isdir(DEBUG_MODS_DIR):
  93. return
  94. try:
  95. for module in os.scandir(DEBUG_MODS_DIR):
  96. last_modified = os.stat(module.path).st_mtime
  97. cls_ = module.path.split("/")[-1].split(".py")[0]
  98. if cls_ not in self._memory:
  99. self._memory[cls_] = last_modified
  100. continue
  101. if self._memory[cls_] == last_modified:
  102. continue
  103. self._memory[cls_] = last_modified
  104. logger.debug("Reloading debug module %s", cls_)
  105. with open(module.path, "r") as f:
  106. try:
  107. await next(
  108. module
  109. for module in self.allmodules.modules
  110. if module.__class__.__name__ == "LoaderMod"
  111. ).load_module(
  112. f.read(),
  113. None,
  114. save_fs=False,
  115. )
  116. except Exception:
  117. logger.exception("Failed to reload module in watchdog")
  118. except Exception:
  119. logger.exception("Failed debugging watchdog")
  120. return
  121. @loader.command()
  122. async def debugmod(self, message: Message):
  123. args = utils.get_args_raw(message)
  124. instance = None
  125. for module in self.allmodules.modules:
  126. if (
  127. module.__class__.__name__.lower() == args.lower()
  128. or module.strings["name"].lower() == args.lower()
  129. ):
  130. if os.path.isfile(
  131. os.path.join(
  132. DEBUG_MODS_DIR,
  133. f"{module.__class__.__name__}.py",
  134. )
  135. ):
  136. os.remove(
  137. os.path.join(
  138. DEBUG_MODS_DIR,
  139. f"{module.__class__.__name__}.py",
  140. )
  141. )
  142. try:
  143. delattr(module, "hikka_debug")
  144. except AttributeError:
  145. pass
  146. await utils.answer(message, self.strings("debugging_disabled"))
  147. return
  148. module.hikka_debug = True
  149. instance = module
  150. break
  151. if not instance:
  152. await utils.answer(message, self.strings("bad_module"))
  153. return
  154. with open(
  155. os.path.join(
  156. DEBUG_MODS_DIR,
  157. f"{instance.__class__.__name__}.py",
  158. ),
  159. "wb",
  160. ) as f:
  161. f.write(inspect.getmodule(instance).__loader__.data)
  162. await utils.answer(
  163. message,
  164. self.strings("debugging_enabled").format(instance.__class__.__name__),
  165. )
  166. @loader.command()
  167. async def logs(
  168. self,
  169. message: typing.Union[Message, InlineCall],
  170. force: bool = False,
  171. lvl: typing.Union[int, None] = None,
  172. ):
  173. if not isinstance(lvl, int):
  174. args = utils.get_args_raw(message)
  175. try:
  176. try:
  177. lvl = int(args.split()[0])
  178. except ValueError:
  179. lvl = getattr(logging, args.split()[0].upper(), None)
  180. except IndexError:
  181. lvl = None
  182. if not isinstance(lvl, int):
  183. try:
  184. if not self.inline.init_complete or not await self.inline.form(
  185. text=self.strings("choose_loglevel"),
  186. reply_markup=utils.chunks(
  187. [
  188. {
  189. "text": name,
  190. "callback": self.logs,
  191. "args": (False, level),
  192. }
  193. for name, level in [
  194. ("🚫 Error", 40),
  195. ("⚠️ Warning", 30),
  196. ("ℹ️ Info", 20),
  197. ("🧑‍💻 All", 0),
  198. ]
  199. ],
  200. 2,
  201. )
  202. + [[{"text": self.strings("cancel"), "action": "close"}]],
  203. message=message,
  204. ):
  205. raise
  206. except Exception:
  207. await utils.answer(message, self.strings("set_loglevel"))
  208. return
  209. logs = "\n\n".join(
  210. [
  211. "\n".join(
  212. handler.dumps(lvl, client_id=self._client.tg_id)
  213. if "client_id" in inspect.signature(handler.dumps).parameters
  214. else handler.dumps(lvl)
  215. )
  216. for handler in logging.getLogger().handlers
  217. ]
  218. )
  219. named_lvl = (
  220. lvl
  221. if lvl not in logging._levelToName
  222. else logging._levelToName[lvl] # skipcq: PYL-W0212
  223. )
  224. if (
  225. lvl < logging.WARNING
  226. and not force
  227. and (
  228. not isinstance(message, Message)
  229. or "force_insecure" not in message.raw_text.lower()
  230. )
  231. ):
  232. try:
  233. if not self.inline.init_complete:
  234. raise
  235. cfg = {
  236. "text": self.strings("confidential").format(named_lvl),
  237. "reply_markup": [
  238. {
  239. "text": self.strings("send_anyway"),
  240. "callback": self.logs,
  241. "args": [True, lvl],
  242. },
  243. {"text": self.strings("cancel"), "action": "close"},
  244. ],
  245. }
  246. if isinstance(message, Message):
  247. if not await self.inline.form(**cfg, message=message):
  248. raise
  249. else:
  250. await message.edit(**cfg)
  251. except Exception:
  252. await utils.answer(
  253. message,
  254. self.strings("confidential_text").format(named_lvl),
  255. )
  256. return
  257. if len(logs) <= 2:
  258. if isinstance(message, Message):
  259. await utils.answer(message, self.strings("no_logs").format(named_lvl))
  260. else:
  261. await message.edit(self.strings("no_logs").format(named_lvl))
  262. await message.unload()
  263. return
  264. logs = self.lookup("evaluator").censor(logs)
  265. logs = BytesIO(logs.encode("utf-16"))
  266. logs.name = "hikka-logs.txt"
  267. ghash = utils.get_git_hash()
  268. other = (
  269. *main.__version__,
  270. (
  271. " <a"
  272. f' href="https://github.com/hikariatama/Hikka/commit/{ghash}">@{ghash[:8]}</a>'
  273. if ghash
  274. else ""
  275. ),
  276. )
  277. if getattr(message, "out", True):
  278. await message.delete()
  279. if isinstance(message, Message):
  280. await utils.answer(
  281. message,
  282. logs,
  283. caption=self.strings("logs_caption").format(named_lvl, *other),
  284. )
  285. else:
  286. await self._client.send_file(
  287. message.form["chat"],
  288. logs,
  289. caption=self.strings("logs_caption").format(named_lvl, *other),
  290. reply_to=message.form["top_msg_id"],
  291. )
  292. @loader.command()
  293. async def suspend(self, message: Message):
  294. try:
  295. time_sleep = float(utils.get_args_raw(message))
  296. await utils.answer(
  297. message,
  298. self.strings("suspended").format(time_sleep),
  299. )
  300. time.sleep(time_sleep)
  301. except ValueError:
  302. await utils.answer(message, self.strings("suspend_invalid_time"))
  303. @loader.command()
  304. async def ping(self, message: Message):
  305. start = time.perf_counter_ns()
  306. message = await utils.answer(message, "🌘")
  307. await utils.answer(
  308. message,
  309. self.strings("results_ping").format(
  310. round((time.perf_counter_ns() - start) / 10**6, 3),
  311. utils.formatted_uptime(),
  312. )
  313. + (
  314. ("\n\n" + self.strings("ping_hint"))
  315. if random.choice([0, 0, 1]) == 1
  316. else ""
  317. ),
  318. )
  319. async def client_ready(self):
  320. chat, _ = await utils.asset_channel(
  321. self._client,
  322. "hikka-logs",
  323. "🌘 Your Hikka logs will appear in this chat",
  324. silent=True,
  325. invite_bot=True,
  326. avatar="https://github.com/hikariatama/assets/raw/master/hikka-logs.png",
  327. )
  328. self.logchat = int(f"-100{chat.id}")
  329. logging.getLogger().handlers[0].install_tg_log(self)
  330. logger.debug("Bot logging installed for %s", self.logchat)
  331. self._pass_config_to_logger()