_types.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. import ast
  9. import logging
  10. from dataclasses import dataclass, field
  11. from typing import Any, Optional, Union
  12. from .inline.types import * # skipcq: PYL-W0614
  13. from . import validators # skipcq: PY-W2000
  14. from importlib.abc import SourceLoader
  15. from telethon.tl.types import Message
  16. logger = logging.getLogger(__name__)
  17. class StringLoader(SourceLoader):
  18. """Load a python module/file from a string"""
  19. def __init__(self, data: str, origin: str):
  20. self.data = data.encode("utf-8") if isinstance(data, str) else data
  21. self.origin = origin
  22. def get_code(self, fullname: str) -> str:
  23. return (
  24. compile(source, self.origin, "exec", dont_inherit=True)
  25. if (source := self.get_source(fullname))
  26. else None
  27. )
  28. def get_filename(self, *args, **kwargs) -> str:
  29. return self.origin
  30. def get_data(self, *args, **kwargs) -> bytes:
  31. return self.data
  32. class Module:
  33. strings = {"name": "Unknown"}
  34. """There is no help for this module"""
  35. def config_complete(self):
  36. """Called when module.config is populated"""
  37. async def client_ready(self, client, db):
  38. """Called after client is ready (after config_loaded)"""
  39. async def on_unload(self):
  40. """Called after unloading / reloading module"""
  41. async def on_dlmod(self, client, db):
  42. """
  43. Called after the module is first time loaded with .dlmod or .loadmod
  44. Possible use-cases:
  45. - Send reaction to author's channel message
  46. - Join author's channel
  47. - Create asset folder
  48. - ...
  49. ⚠️ Note, that any error there will not interrupt module load, and will just
  50. send a message to logs with verbosity INFO and exception traceback
  51. """
  52. class Library:
  53. """All external libraries must have a class-inheritant from this class"""
  54. class LoadError(Exception):
  55. """Tells user, why your module can't be loaded, if raised in `client_ready`"""
  56. def __init__(self, error_message: str): # skipcq: PYL-W0231
  57. self._error = error_message
  58. def __str__(self) -> str:
  59. return self._error
  60. class CoreOverwriteError(Exception):
  61. """Is being raised when core module or command is overwritten"""
  62. def __init__(self, module: Optional[str] = None, command: Optional[str] = None):
  63. self.type = "module" if module else "command"
  64. self.target = module or command
  65. super().__init__()
  66. def __str__(self) -> str:
  67. return (
  68. f"Module {self.target} will not be overwritten, because it's core"
  69. if self.type == "module"
  70. else f"Command {self.target} will not be overwritten, because it's core"
  71. )
  72. class CoreUnloadError(Exception):
  73. """Is being raised when user tries to unload core module"""
  74. def __init__(self, module: str):
  75. self.module = module
  76. super().__init__()
  77. def __str__(self) -> str:
  78. return f"Module {self.module} will not be unloaded, because it's core"
  79. class SelfUnload(Exception):
  80. """Silently unloads module, if raised in `client_ready`"""
  81. def __init__(self, error_message: Optional[str] = ""):
  82. super().__init__()
  83. self._error = error_message
  84. def __str__(self) -> str:
  85. return self._error
  86. class SelfSuspend(Exception):
  87. """
  88. Silently suspends module, if raised in `client_ready`
  89. Commands and watcher will not be registered if raised
  90. Module won't be unloaded from db and will be unfreezed after restart, unless
  91. the exception is raised again
  92. """
  93. def __init__(self, error_message: Optional[str] = ""):
  94. super().__init__()
  95. self._error = error_message
  96. def __str__(self) -> str:
  97. return self._error
  98. class StopLoop(Exception):
  99. """Stops the loop, in which is raised"""
  100. class ModuleConfig(dict):
  101. """Stores config for modules and apparently libraries"""
  102. def __init__(self, *entries):
  103. if all(isinstance(entry, ConfigValue) for entry in entries):
  104. # New config format processing
  105. self._config = {config.option: config for config in entries}
  106. else:
  107. # Legacy config processing
  108. keys = []
  109. values = []
  110. defaults = []
  111. docstrings = []
  112. for i, entry in enumerate(entries):
  113. if i % 3 == 0:
  114. keys += [entry]
  115. elif i % 3 == 1:
  116. values += [entry]
  117. defaults += [entry]
  118. else:
  119. docstrings += [entry]
  120. self._config = {
  121. key: ConfigValue(option=key, default=default, doc=doc)
  122. for key, default, doc in zip(keys, defaults, docstrings)
  123. }
  124. super().__init__(
  125. {option: config.value for option, config in self._config.items()}
  126. )
  127. def getdoc(self, key: str, message: Message = None) -> str:
  128. """Get the documentation by key"""
  129. ret = self._config[key].doc
  130. if callable(ret):
  131. try:
  132. # Compatibility tweak
  133. # does nothing in Hikka
  134. ret = ret(message)
  135. except Exception:
  136. ret = ret()
  137. return ret
  138. def getdef(self, key: str) -> str:
  139. """Get the default value by key"""
  140. return self._config[key].default
  141. def __setitem__(self, key: str, value: Any):
  142. self._config[key].value = value
  143. self.update({key: value})
  144. def set_no_raise(self, key: str, value: Any):
  145. self._config[key].set_no_raise(value)
  146. self.update({key: value})
  147. def __getitem__(self, key: str) -> Any:
  148. try:
  149. return self._config[key].value
  150. except KeyError:
  151. return None
  152. LibraryConfig = ModuleConfig
  153. class _Placeholder:
  154. """Placeholder to determine if the default value is going to be set"""
  155. @dataclass(repr=True)
  156. class ConfigValue:
  157. option: str
  158. default: Any = None
  159. doc: Union[callable, str] = "No description"
  160. value: Any = field(default_factory=_Placeholder)
  161. validator: Optional[callable] = None
  162. def __post_init__(self):
  163. if isinstance(self.value, _Placeholder):
  164. self.value = self.default
  165. def set_no_raise(self, value: Any) -> bool:
  166. """
  167. Sets the config value w/o ValidationError being raised
  168. Should not be used uninternally
  169. """
  170. return self.__setattr__("value", value, ignore_validation=True)
  171. def __setattr__(
  172. self,
  173. key: str,
  174. value: Any,
  175. *,
  176. ignore_validation: Optional[bool] = False,
  177. ) -> bool:
  178. if key == "value":
  179. try:
  180. value = ast.literal_eval(value)
  181. except Exception:
  182. pass
  183. # Convert value to list if it's tuple just not to mess up
  184. # with json convertations
  185. if isinstance(value, (set, tuple)):
  186. value = list(value)
  187. if isinstance(value, list):
  188. value = [
  189. item.strip() if isinstance(item, str) else item for item in value
  190. ]
  191. if self.validator is not None:
  192. if value is not None:
  193. try:
  194. value = self.validator.validate(value)
  195. except validators.ValidationError as e:
  196. if not ignore_validation:
  197. raise e
  198. logger.debug(
  199. f"Config value was broken ({value}), so it was reset to"
  200. f" {self.default}"
  201. )
  202. value = self.default
  203. else:
  204. defaults = {
  205. "String": "",
  206. "Integer": 0,
  207. "Boolean": False,
  208. "Series": [],
  209. "Float": 0.0,
  210. }
  211. if self.validator.internal_id in defaults:
  212. logger.debug(
  213. "Config value was None, so it was reset to"
  214. f" {defaults[self.validator.internal_id]}"
  215. )
  216. value = defaults[self.validator.internal_id]
  217. # This attribute will tell the `Loader` to save this value in db
  218. self._save_marker = True
  219. object.__setattr__(self, key, value)