python.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import contextlib
  9. import itertools
  10. import sys
  11. from types import ModuleType
  12. import os
  13. import typing
  14. import telethon
  15. from meval import meval
  16. from telethon.errors.rpcerrorlist import MessageIdInvalidError
  17. from telethon.tl.types import Message
  18. from .. import loader, main, utils
  19. from ..log import HikkaException
  20. @loader.tds
  21. class PythonMod(loader.Module):
  22. """Evaluates python code"""
  23. strings = {
  24. "name": "Python",
  25. "eval": (
  26. "<emoji document_id=5431376038628171216>🎬</emoji><b>"
  27. " Code:</b>\n<code>{}</code>\n<emoji"
  28. " document_id=5472164874886846699>✨</emoji><b>"
  29. " Result:</b>\n<code>{}</code>"
  30. ),
  31. "err": (
  32. "<emoji document_id=5431376038628171216>🎬</emoji><b>"
  33. " Code:</b>\n<code>{}</code>\n\n<emoji"
  34. " document_id=6323575131239089635>🚫</emoji><b> Error:</b>\n{}"
  35. ),
  36. }
  37. strings_ru = {
  38. "eval": (
  39. "<emoji document_id=5431376038628171216>🎬</emoji><b>"
  40. " Код:</b>\n<code>{}</code>\n<emoji"
  41. " document_id=5472164874886846699>✨</emoji><b>"
  42. " Результат:</b>\n<code>{}</code>"
  43. ),
  44. "err": (
  45. "<emoji document_id=5431376038628171216>🎬</emoji><b>"
  46. " Код:</b>\n<code>{}</code>\n\n<emoji"
  47. " document_id=6323575131239089635>🚫</emoji><b> Ошибка:</b>\n{}"
  48. ),
  49. "_cls_doc": "Выполняет Python код",
  50. }
  51. @loader.owner
  52. @loader.command(ru_doc="Алиас для команды .e")
  53. async def eval(self, message: Message):
  54. """Alias for .e command"""
  55. await self.e(message)
  56. @loader.owner
  57. @loader.command(ru_doc="Выполняет Python код")
  58. async def e(self, message: Message):
  59. """Evaluates python code"""
  60. ret = self.strings("eval")
  61. try:
  62. result = await meval(
  63. utils.get_args_raw(message),
  64. globals(),
  65. **await self.getattrs(message),
  66. )
  67. except Exception:
  68. item = HikkaException.from_exc_info(*sys.exc_info())
  69. exc = (
  70. "\n<b>🪐 Full stack:</b>\n\n"
  71. + "\n".join(item.full_stack.splitlines()[:-1])
  72. + "\n\n"
  73. + "🚫 "
  74. + item.full_stack.splitlines()[-1]
  75. )
  76. exc = exc.replace(str(self._client.hikka_me.phone), "📵")
  77. if os.environ.get("hikka_session"):
  78. exc = exc.replace(
  79. os.environ.get("hikka_session"),
  80. "StringSession(**************************)",
  81. )
  82. await utils.answer(
  83. message,
  84. self.strings("err").format(
  85. utils.escape_html(utils.get_args_raw(message)),
  86. exc,
  87. ),
  88. )
  89. return
  90. if callable(getattr(result, "stringify", None)):
  91. with contextlib.suppress(Exception):
  92. result = str(result.stringify())
  93. result = str(result)
  94. ret = ret.format(
  95. utils.escape_html(utils.get_args_raw(message)),
  96. utils.escape_html(result),
  97. )
  98. ret = ret.replace(str(self._client.hikka_me.phone), "📵")
  99. if redis := os.environ.get("REDIS_URL") or main.get_config_key("redis_uri"):
  100. ret = ret.replace(redis, "redis://**************************")
  101. if os.environ.get("hikka_session"):
  102. ret = ret.replace(
  103. os.environ.get("hikka_session"),
  104. "StringSession(**************************)",
  105. )
  106. with contextlib.suppress(MessageIdInvalidError):
  107. await utils.answer(message, ret)
  108. async def getattrs(self, message: Message) -> dict:
  109. reply = await message.get_reply_message()
  110. return {
  111. **{
  112. "message": message,
  113. "client": self._client,
  114. "reply": reply,
  115. "r": reply,
  116. **self.get_sub(telethon.tl.types),
  117. **self.get_sub(telethon.tl.functions),
  118. "event": message,
  119. "chat": message.to_id,
  120. "telethon": telethon,
  121. "utils": utils,
  122. "main": main,
  123. "loader": loader,
  124. "f": telethon.tl.functions,
  125. "c": self._client,
  126. "m": message,
  127. "lookup": self.lookup,
  128. "self": self,
  129. "db": self.db,
  130. },
  131. }
  132. def get_sub(self, obj: typing.Any, _depth: int = 1) -> dict:
  133. """Get all callable capitalised objects in an object recursively, ignoring _*"""
  134. return {
  135. **dict(
  136. filter(
  137. lambda x: x[0][0] != "_"
  138. and x[0][0].upper() == x[0][0]
  139. and callable(x[1]),
  140. obj.__dict__.items(),
  141. )
  142. ),
  143. **dict(
  144. itertools.chain.from_iterable(
  145. [
  146. self.get_sub(y[1], _depth + 1).items()
  147. for y in filter(
  148. lambda x: x[0][0] != "_"
  149. and isinstance(x[1], ModuleType)
  150. and x[1] != obj
  151. and x[1].__package__.rsplit(".", _depth)[0]
  152. == "telethon.tl",
  153. obj.__dict__.items(),
  154. )
  155. ]
  156. )
  157. ),
  158. }