python.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import contextlib
  9. import itertools
  10. import logging
  11. import sys
  12. from types import ModuleType
  13. import os
  14. from typing import Any
  15. import telethon
  16. from meval import meval
  17. from telethon.errors.rpcerrorlist import MessageIdInvalidError
  18. from telethon.tl.types import Message
  19. from .. import loader, main, utils
  20. from ..log import HikkaException
  21. logger = logging.getLogger(__name__)
  22. @loader.tds
  23. class PythonMod(loader.Module):
  24. """Evaluates python code"""
  25. strings = {
  26. "name": "Python",
  27. "eval": (
  28. "<emoji document_id='5444965061749644170'>🎬</emoji><b>"
  29. " Code:</b>\n<code>{}</code>\n<emoji"
  30. " document_id='6321231595218929203'>🌠</emoji><b>"
  31. " Result:</b>\n<code>{}</code>"
  32. ),
  33. "err": (
  34. "<emoji document_id='5444965061749644170'>🎬</emoji><b>"
  35. " Code:</b>\n<code>{}</code>\n\n<emoji"
  36. " document_id='6323575131239089635'>🚫</emoji><b> Error:</b>\n{}"
  37. ),
  38. }
  39. strings_ru = {
  40. "eval": (
  41. "<emoji document_id='5444965061749644170'>🎬</emoji><b>"
  42. " Код:</b>\n<code>{}</code>\n<emoji"
  43. " document_id='6321231595218929203'>🌠</emoji><b>"
  44. " Результат:</b>\n<code>{}</code>"
  45. ),
  46. "err": (
  47. "<emoji document_id='5444965061749644170'>🎬</emoji><b>"
  48. " Код:</b>\n<code>{}</code>\n\n<emoji"
  49. " document_id='6323575131239089635'>🚫</emoji><b> Ошибка:</b>\n{}"
  50. ),
  51. "_cls_doc": "Выполняет Python код",
  52. }
  53. async def client_ready(self):
  54. self._phone = (await self._client.get_me()).phone
  55. @loader.owner
  56. @loader.command(ru_doc="Алиас для команды .e")
  57. async def eval(self, message: Message):
  58. """Alias for .e command"""
  59. await self.ecmd(message)
  60. @loader.owner
  61. @loader.command(ru_doc="Выполняет Python код")
  62. async def e(self, message: Message):
  63. """Evaluates python code"""
  64. ret = self.strings("eval")
  65. try:
  66. result = await meval(
  67. utils.get_args_raw(message),
  68. globals(),
  69. **await self.getattrs(message),
  70. )
  71. except Exception:
  72. item = HikkaException.from_exc_info(*sys.exc_info())
  73. exc = (
  74. "\n<b>🪐 Full stack:</b>\n\n"
  75. + "\n".join(item.full_stack.splitlines()[:-1])
  76. + "\n\n"
  77. + "🚫 "
  78. + item.full_stack.splitlines()[-1]
  79. )
  80. exc = exc.replace(str(self._phone), "📵")
  81. if os.environ.get("DATABASE_URL"):
  82. exc = exc.replace(
  83. os.environ.get("DATABASE_URL"),
  84. "postgre://**************************",
  85. )
  86. if os.environ.get("hikka_session"):
  87. exc = exc.replace(
  88. os.environ.get("hikka_session"),
  89. "StringSession(**************************)",
  90. )
  91. await utils.answer(
  92. message,
  93. self.strings("err").format(
  94. utils.escape_html(utils.get_args_raw(message)),
  95. exc,
  96. ),
  97. )
  98. return
  99. if callable(getattr(result, "stringify", None)):
  100. with contextlib.suppress(Exception):
  101. result = str(result.stringify())
  102. result = str(result)
  103. ret = ret.format(
  104. utils.escape_html(utils.get_args_raw(message)),
  105. utils.escape_html(result),
  106. )
  107. ret = ret.replace(str(self._phone), "📵")
  108. if postgre := os.environ.get("DATABASE_URL") or main.get_config_key(
  109. "postgre_uri"
  110. ):
  111. ret = ret.replace(postgre, "postgre://**************************")
  112. if redis := os.environ.get("REDIS_URL") or main.get_config_key("redis_uri"):
  113. ret = ret.replace(redis, "redis://**************************")
  114. if os.environ.get("hikka_session"):
  115. ret = ret.replace(
  116. os.environ.get("hikka_session"),
  117. "StringSession(**************************)",
  118. )
  119. with contextlib.suppress(MessageIdInvalidError):
  120. await utils.answer(message, ret)
  121. async def getattrs(self, message: Message) -> dict:
  122. reply = await message.get_reply_message()
  123. return {
  124. **{
  125. "message": message,
  126. "client": self._client,
  127. "reply": reply,
  128. "r": reply,
  129. **self.get_sub(telethon.tl.types),
  130. **self.get_sub(telethon.tl.functions),
  131. "event": message,
  132. "chat": message.to_id,
  133. "telethon": telethon,
  134. "utils": utils,
  135. "main": main,
  136. "loader": loader,
  137. "f": telethon.tl.functions,
  138. "c": self._client,
  139. "m": message,
  140. "lookup": self.lookup,
  141. "self": self,
  142. "db": self.db,
  143. },
  144. }
  145. def get_sub(self, obj: Any, _depth: int = 1) -> dict:
  146. """Get all callable capitalised objects in an object recursively, ignoring _*"""
  147. return {
  148. **dict(
  149. filter(
  150. lambda x: x[0][0] != "_"
  151. and x[0][0].upper() == x[0][0]
  152. and callable(x[1]),
  153. obj.__dict__.items(),
  154. )
  155. ),
  156. **dict(
  157. itertools.chain.from_iterable(
  158. [
  159. self.get_sub(y[1], _depth + 1).items()
  160. for y in filter(
  161. lambda x: x[0][0] != "_"
  162. and isinstance(x[1], ModuleType)
  163. and x[1] != obj
  164. and x[1].__package__.rsplit(".", _depth)[0]
  165. == "telethon.tl",
  166. obj.__dict__.items(),
  167. )
  168. ]
  169. )
  170. ),
  171. }