utils.py 43 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563
  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2022
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import asyncio
  20. import atexit as _atexit
  21. import contextlib
  22. import functools
  23. import inspect
  24. import io
  25. import json
  26. import logging
  27. import os
  28. import random
  29. import re
  30. import shlex
  31. import signal
  32. import string
  33. import time
  34. import typing
  35. from datetime import timedelta
  36. from urllib.parse import urlparse
  37. import git
  38. import grapheme
  39. import requests
  40. import telethon
  41. from aiogram.types import Message as AiogramMessage
  42. from telethon import hints
  43. from telethon.tl.custom.message import Message
  44. from telethon.tl.functions.account import UpdateNotifySettingsRequest
  45. from telethon.tl.functions.channels import (
  46. CreateChannelRequest,
  47. EditAdminRequest,
  48. EditPhotoRequest,
  49. InviteToChannelRequest,
  50. )
  51. from telethon.tl.functions.messages import (
  52. GetDialogFiltersRequest,
  53. SetHistoryTTLRequest,
  54. UpdateDialogFilterRequest,
  55. )
  56. from telethon.tl.types import (
  57. Channel,
  58. Chat,
  59. ChatAdminRights,
  60. InputDocument,
  61. InputPeerNotifySettings,
  62. MessageEntityBankCard,
  63. MessageEntityBlockquote,
  64. MessageEntityBold,
  65. MessageEntityBotCommand,
  66. MessageEntityCashtag,
  67. MessageEntityCode,
  68. MessageEntityEmail,
  69. MessageEntityHashtag,
  70. MessageEntityItalic,
  71. MessageEntityMention,
  72. MessageEntityMentionName,
  73. MessageEntityPhone,
  74. MessageEntityPre,
  75. MessageEntitySpoiler,
  76. MessageEntityStrike,
  77. MessageEntityTextUrl,
  78. MessageEntityUnderline,
  79. MessageEntityUnknown,
  80. MessageEntityUrl,
  81. MessageMediaWebPage,
  82. PeerChannel,
  83. PeerChat,
  84. PeerUser,
  85. UpdateNewChannelMessage,
  86. User,
  87. )
  88. from .inline.types import InlineCall, InlineMessage
  89. from .tl_cache import CustomTelegramClient
  90. from .types import HikkaReplyMarkup, ListLike, Module
  91. FormattingEntity = typing.Union[
  92. MessageEntityUnknown,
  93. MessageEntityMention,
  94. MessageEntityHashtag,
  95. MessageEntityBotCommand,
  96. MessageEntityUrl,
  97. MessageEntityEmail,
  98. MessageEntityBold,
  99. MessageEntityItalic,
  100. MessageEntityCode,
  101. MessageEntityPre,
  102. MessageEntityTextUrl,
  103. MessageEntityMentionName,
  104. MessageEntityPhone,
  105. MessageEntityCashtag,
  106. MessageEntityUnderline,
  107. MessageEntityStrike,
  108. MessageEntityBlockquote,
  109. MessageEntityBankCard,
  110. MessageEntitySpoiler,
  111. ]
  112. emoji_pattern = re.compile(
  113. "["
  114. "\U0001F600-\U0001F64F" # emoticons
  115. "\U0001F300-\U0001F5FF" # symbols & pictographs
  116. "\U0001F680-\U0001F6FF" # transport & map symbols
  117. "\U0001F1E0-\U0001F1FF" # flags (iOS)
  118. "]+",
  119. flags=re.UNICODE,
  120. )
  121. parser = telethon.utils.sanitize_parse_mode("html")
  122. logger = logging.getLogger(__name__)
  123. def get_args(message: typing.Union[Message, str]) -> typing.List[str]:
  124. """
  125. Get arguments from message
  126. :param message: Message or string to get arguments from
  127. :return: List of arguments
  128. """
  129. if not (message := getattr(message, "message", message)):
  130. return False
  131. if len(message := message.split(maxsplit=1)) <= 1:
  132. return []
  133. message = message[1]
  134. try:
  135. split = shlex.split(message)
  136. except ValueError:
  137. return message # Cannot split, let's assume that it's just one long message
  138. return list(filter(lambda x: len(x) > 0, split))
  139. def get_args_raw(message: typing.Union[Message, str]) -> str:
  140. """
  141. Get the parameters to the command as a raw string (not split)
  142. :param message: Message or string to get arguments from
  143. :return: Raw string of arguments
  144. """
  145. if not (message := getattr(message, "message", message)):
  146. return False
  147. return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
  148. def get_args_html(message: Message) -> str:
  149. """
  150. Get the parameters to the command as string with HTML (not split)
  151. :param message: Message to get arguments from
  152. :return: String with HTML arguments
  153. """
  154. prefix = message.client.loader.get_prefix()
  155. if not (message := message.text):
  156. return False
  157. if prefix not in message:
  158. return message
  159. raw_text, entities = parser.parse(message)
  160. raw_text = parser._add_surrogate(raw_text)
  161. try:
  162. command = raw_text[
  163. raw_text.index(prefix) : raw_text.index(" ", raw_text.index(prefix) + 1)
  164. ]
  165. except ValueError:
  166. return ""
  167. command_len = len(command) + 1
  168. return parser.unparse(
  169. parser._del_surrogate(raw_text[command_len:]),
  170. relocate_entities(entities, -command_len, raw_text[command_len:]),
  171. )
  172. def get_args_split_by(
  173. message: typing.Union[Message, str],
  174. separator: str,
  175. ) -> typing.List[str]:
  176. """
  177. Split args with a specific separator
  178. :param message: Message or string to get arguments from
  179. :param separator: Separator to split by
  180. :return: List of arguments
  181. """
  182. return [
  183. section.strip() for section in get_args_raw(message).split(separator) if section
  184. ]
  185. def get_chat_id(message: typing.Union[Message, AiogramMessage]) -> int:
  186. """
  187. Get the chat ID, but without -100 if its a channel
  188. :param message: Message to get chat ID from
  189. :return: Chat ID
  190. """
  191. return telethon.utils.resolve_id(
  192. getattr(message, "chat_id", None)
  193. or getattr(getattr(message, "chat", None), "id", None)
  194. )[0]
  195. def get_entity_id(entity: hints.Entity) -> int:
  196. """
  197. Get entity ID
  198. :param entity: Entity to get ID from
  199. :return: Entity ID
  200. """
  201. return telethon.utils.get_peer_id(entity)
  202. def escape_html(text: str, /) -> str: # sourcery skip
  203. """
  204. Pass all untrusted/potentially corrupt input here
  205. :param text: Text to escape
  206. :return: Escaped text
  207. """
  208. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  209. def escape_quotes(text: str, /) -> str:
  210. """
  211. Escape quotes to html quotes
  212. :param text: Text to escape
  213. :return: Escaped text
  214. """
  215. return escape_html(text).replace('"', "&quot;")
  216. def get_base_dir() -> str:
  217. """
  218. Get directory of this file
  219. :return: Directory of this file
  220. """
  221. return get_dir(__file__)
  222. def get_dir(mod: str) -> str:
  223. """
  224. Get directory of given module
  225. :param mod: Module's `__file__` to get directory of
  226. :return: Directory of given module
  227. """
  228. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  229. async def get_user(message: Message) -> typing.Optional[User]:
  230. """
  231. Get user who sent message, searching if not found easily
  232. :param message: Message to get user from
  233. :return: User who sent message
  234. """
  235. try:
  236. return await message.client.get_entity(message.sender_id)
  237. except ValueError: # Not in database. Lets go looking for them.
  238. logger.debug("User not in session cache. Searching...")
  239. if isinstance(message.peer_id, PeerUser):
  240. await message.client.get_dialogs()
  241. return await message.client.get_entity(message.sender_id)
  242. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  243. try:
  244. return await message.client.get_entity(message.sender_id)
  245. except Exception:
  246. pass
  247. async for user in message.client.iter_participants(
  248. message.peer_id,
  249. aggressive=True,
  250. ):
  251. if user.id == message.sender_id:
  252. return user
  253. logger.error("User isn't in the group where they sent the message")
  254. return None
  255. logger.error("`peer_id` is not a user, chat or channel")
  256. return None
  257. def run_sync(func, *args, **kwargs):
  258. """
  259. Run a non-async function in a new thread and return an awaitable
  260. :param func: Sync-only function to execute
  261. :return: Awaitable coroutine
  262. """
  263. return asyncio.get_event_loop().run_in_executor(
  264. None,
  265. functools.partial(func, *args, **kwargs),
  266. )
  267. def run_async(loop: asyncio.AbstractEventLoop, coro: typing.Awaitable) -> typing.Any:
  268. """
  269. Run an async function as a non-async function, blocking till it's done
  270. :param loop: Event loop to run the coroutine in
  271. :param coro: Coroutine to run
  272. :return: Result of the coroutine
  273. """
  274. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  275. def censor(
  276. obj: typing.Any,
  277. to_censor: typing.Optional[typing.Iterable[str]] = None,
  278. replace_with: str = "redacted_{count}_chars",
  279. ):
  280. """
  281. May modify the original object, but don't rely on it
  282. :param obj: Object to censor, preferrably telethon
  283. :param to_censor: Iterable of strings to censor
  284. :param replace_with: String to replace with, {count} will be replaced with the number of characters
  285. :return: Censored object
  286. """
  287. if to_censor is None:
  288. to_censor = ["phone"]
  289. for k, v in vars(obj).items():
  290. if k in to_censor:
  291. setattr(obj, k, replace_with.format(count=len(v)))
  292. elif k[0] != "_" and hasattr(v, "__dict__"):
  293. setattr(obj, k, censor(v, to_censor, replace_with))
  294. return obj
  295. def relocate_entities(
  296. entities: typing.List[FormattingEntity],
  297. offset: int,
  298. text: typing.Optional[str] = None,
  299. ) -> typing.List[FormattingEntity]:
  300. """
  301. Move all entities by offset (truncating at text)
  302. :param entities: List of entities
  303. :param offset: Offset to move by
  304. :param text: Text to truncate at
  305. :return: List of entities
  306. """
  307. length = len(text) if text is not None else 0
  308. for ent in entities.copy() if entities else ():
  309. ent.offset += offset
  310. if ent.offset < 0:
  311. ent.length += ent.offset
  312. ent.offset = 0
  313. if text is not None and ent.offset + ent.length > length:
  314. ent.length = length - ent.offset
  315. if ent.length <= 0:
  316. entities.remove(ent)
  317. return entities
  318. async def answer_file(
  319. message: typing.Union[Message, InlineCall, InlineMessage],
  320. file: typing.Union[str, bytes, io.IOBase, InputDocument],
  321. caption: typing.Optional[str] = None,
  322. **kwargs,
  323. ):
  324. """
  325. Use this to answer a message with a document
  326. :param message: Message to answer
  327. :param file: File to send - url, path or bytes
  328. :param caption: Caption to send
  329. :param kwargs: Extra kwargs to pass to `send_file`
  330. :return: Sent message
  331. :example:
  332. >>> await utils.answer_file(message, "test.txt")
  333. >>> await utils.answer_file(
  334. message,
  335. "https://mods.hikariatama.ru/badges/artai.jpg",
  336. "This is the cool module, check it out!",
  337. )
  338. """
  339. if isinstance(message, (InlineCall, InlineMessage)):
  340. message = message.form["caller"]
  341. if topic := get_topic(message):
  342. kwargs.setdefault("reply_to", topic)
  343. try:
  344. response = await message.client.send_file(
  345. message.peer_id,
  346. file,
  347. caption=caption,
  348. **kwargs,
  349. )
  350. except Exception:
  351. if caption:
  352. logger.warning(
  353. "Failed to send file, sending plain text instead", exc_info=True
  354. )
  355. return await answer(message, caption, **kwargs)
  356. raise
  357. with contextlib.suppress(Exception):
  358. await message.delete()
  359. return response
  360. async def answer(
  361. message: typing.Union[Message, InlineCall, InlineMessage],
  362. response: str,
  363. *,
  364. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  365. **kwargs,
  366. ) -> typing.Union[InlineCall, InlineMessage, Message]:
  367. """
  368. Use this to give the response to a command
  369. :param message: Message to answer to. Can be a tl message or hikka inline object
  370. :param response: Response to send
  371. :param reply_markup: Reply markup to send. If specified, inline form will be used
  372. :return: Message or inline object
  373. :example:
  374. >>> await utils.answer(message, "Hello world!")
  375. >>> await utils.answer(
  376. message,
  377. "https://some-url.com/photo.jpg",
  378. caption="Hello, this is your photo!",
  379. asfile=True,
  380. )
  381. >>> await utils.answer(
  382. message,
  383. "Hello world!",
  384. reply_markup={"text": "Hello!", "data": "world"},
  385. silent=True,
  386. disable_security=True,
  387. )
  388. """
  389. # Compatibility with FTG\GeekTG
  390. if isinstance(message, list) and message:
  391. message = message[0]
  392. if reply_markup is not None:
  393. if not isinstance(reply_markup, (list, dict)):
  394. raise ValueError("reply_markup must be a list or dict")
  395. if reply_markup:
  396. kwargs.pop("message", None)
  397. if isinstance(message, (InlineMessage, InlineCall)):
  398. await message.edit(response, reply_markup, **kwargs)
  399. return
  400. reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
  401. result = await message.client.loader.inline.form(
  402. response,
  403. message=message if message.out else get_chat_id(message),
  404. reply_markup=reply_markup,
  405. **kwargs,
  406. )
  407. return result
  408. if isinstance(message, (InlineMessage, InlineCall)):
  409. await message.edit(response)
  410. return message
  411. kwargs.setdefault("link_preview", False)
  412. if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
  413. kwargs.setdefault(
  414. "reply_to",
  415. getattr(message, "reply_to_msg_id", None),
  416. )
  417. parse_mode = telethon.utils.sanitize_parse_mode(
  418. kwargs.pop(
  419. "parse_mode",
  420. message.client.parse_mode,
  421. )
  422. )
  423. if isinstance(response, str) and not kwargs.pop("asfile", False):
  424. text, entities = parse_mode.parse(response)
  425. if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
  426. try:
  427. if not message.client.loader.inline.init_complete:
  428. raise
  429. strings = list(smart_split(text, entities, 4096))
  430. if len(strings) > 10:
  431. raise
  432. list_ = await message.client.loader.inline.list(
  433. message=message,
  434. strings=strings,
  435. )
  436. if not list_:
  437. raise
  438. return list_
  439. except Exception:
  440. file = io.BytesIO(text.encode("utf-8"))
  441. file.name = "command_result.txt"
  442. result = await message.client.send_file(
  443. message.peer_id,
  444. file,
  445. caption=message.client.loader.lookup("translations").strings(
  446. "too_long"
  447. ),
  448. reply_to=kwargs.get("reply_to") or get_topic(message),
  449. )
  450. if message.out:
  451. await message.delete()
  452. return result
  453. result = await (message.edit if edit else message.respond)(
  454. text,
  455. parse_mode=lambda t: (t, entities),
  456. **kwargs,
  457. )
  458. elif isinstance(response, Message):
  459. if message.media is None and (
  460. response.media is None or isinstance(response.media, MessageMediaWebPage)
  461. ):
  462. result = await message.edit(
  463. response.message,
  464. parse_mode=lambda t: (t, response.entities or []),
  465. link_preview=isinstance(response.media, MessageMediaWebPage),
  466. )
  467. else:
  468. result = await message.respond(response, **kwargs)
  469. else:
  470. if isinstance(response, bytes):
  471. response = io.BytesIO(response)
  472. elif isinstance(response, str):
  473. response = io.BytesIO(response.encode("utf-8"))
  474. if name := kwargs.pop("filename", None):
  475. response.name = name
  476. if message.media is not None and edit:
  477. await message.edit(file=response, **kwargs)
  478. else:
  479. kwargs.setdefault(
  480. "reply_to",
  481. getattr(message, "reply_to_msg_id", get_topic(message)),
  482. )
  483. result = await message.client.send_file(message.peer_id, response, **kwargs)
  484. if message.out:
  485. await message.delete()
  486. return result
  487. async def get_target(message: Message, arg_no: int = 0) -> typing.Optional[int]:
  488. """
  489. Get target from message
  490. :param message: Message to get target from
  491. :param arg_no: Argument number to get target from
  492. :return: Target
  493. """
  494. if any(
  495. isinstance(entity, MessageEntityMentionName)
  496. for entity in (message.entities or [])
  497. ):
  498. e = sorted(
  499. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  500. key=lambda x: x.offset,
  501. )[0]
  502. return e.user_id
  503. if len(get_args(message)) > arg_no:
  504. user = get_args(message)[arg_no]
  505. elif message.is_reply:
  506. return (await message.get_reply_message()).sender_id
  507. elif hasattr(message.peer_id, "user_id"):
  508. user = message.peer_id.user_id
  509. else:
  510. return None
  511. try:
  512. entity = await message.client.get_entity(user)
  513. except ValueError:
  514. return None
  515. else:
  516. if isinstance(entity, User):
  517. return entity.id
  518. def merge(a: dict, b: dict, /) -> dict:
  519. """
  520. Merge with replace dictionary a to dictionary b
  521. :param a: Dictionary to merge
  522. :param b: Dictionary to merge to
  523. :return: Merged dictionary
  524. """
  525. for key in a:
  526. if key in b:
  527. if isinstance(a[key], dict) and isinstance(b[key], dict):
  528. b[key] = merge(a[key], b[key])
  529. elif isinstance(a[key], list) and isinstance(b[key], list):
  530. b[key] = list(set(b[key] + a[key]))
  531. else:
  532. b[key] = a[key]
  533. b[key] = a[key]
  534. return b
  535. async def set_avatar(
  536. client: CustomTelegramClient,
  537. peer: hints.Entity,
  538. avatar: str,
  539. ) -> bool:
  540. """
  541. Sets an entity avatar
  542. :param client: Client to use
  543. :param peer: Peer to set avatar to
  544. :param avatar: Avatar to set
  545. :return: True if avatar was set, False otherwise
  546. """
  547. if isinstance(avatar, str) and check_url(avatar):
  548. f = (
  549. await run_sync(
  550. requests.get,
  551. avatar,
  552. )
  553. ).content
  554. elif isinstance(avatar, bytes):
  555. f = avatar
  556. else:
  557. return False
  558. res = await client(
  559. EditPhotoRequest(
  560. channel=peer,
  561. photo=await client.upload_file(f, file_name="photo.png"),
  562. )
  563. )
  564. try:
  565. await client.delete_messages(
  566. peer,
  567. message_ids=[
  568. next(
  569. update
  570. for update in res.updates
  571. if isinstance(update, UpdateNewChannelMessage)
  572. ).message.id
  573. ],
  574. )
  575. except Exception:
  576. pass
  577. return True
  578. async def invite_inline_bot(
  579. client: CustomTelegramClient,
  580. peer: hints.EntityLike,
  581. ) -> None:
  582. """
  583. Invites inline bot to a chat
  584. :param client: Client to use
  585. :param peer: Peer to invite bot to
  586. :return: None
  587. :raise RuntimeError: If error occurred while inviting bot
  588. """
  589. try:
  590. await client(InviteToChannelRequest(peer, [client.loader.inline.bot_username]))
  591. except Exception as e:
  592. raise RuntimeError(
  593. "Can't invite inline bot to old asset chat, which is required by module"
  594. ) from e
  595. with contextlib.suppress(Exception):
  596. await client(
  597. EditAdminRequest(
  598. channel=peer,
  599. user_id=client.loader.inline.bot_username,
  600. admin_rights=ChatAdminRights(ban_users=True),
  601. rank="Hikka",
  602. )
  603. )
  604. async def asset_channel(
  605. client: CustomTelegramClient,
  606. title: str,
  607. description: str,
  608. *,
  609. channel: bool = False,
  610. silent: bool = False,
  611. archive: bool = False,
  612. invite_bot: bool = False,
  613. avatar: typing.Optional[str] = None,
  614. ttl: typing.Optional[int] = None,
  615. _folder: typing.Optional[str] = None,
  616. ) -> typing.Tuple[Channel, bool]:
  617. """
  618. Create new channel (if needed) and return its entity
  619. :param client: Telegram client to create channel by
  620. :param title: Channel title
  621. :param description: Description
  622. :param channel: Whether to create a channel or supergroup
  623. :param silent: Automatically mute channel
  624. :param archive: Automatically archive channel
  625. :param invite_bot: Add inline bot and assure it's in chat
  626. :param avatar: Url to an avatar to set as pfp of created peer
  627. :param ttl: Time to live for messages in channel
  628. :return: Peer and bool: is channel new or pre-existent
  629. """
  630. if not hasattr(client, "_channels_cache"):
  631. client._channels_cache = {}
  632. if (
  633. title in client._channels_cache
  634. and client._channels_cache[title]["exp"] > time.time()
  635. ):
  636. return client._channels_cache[title]["peer"], False
  637. async for d in client.iter_dialogs():
  638. if d.title == title:
  639. client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
  640. if invite_bot:
  641. if all(
  642. participant.id != client.loader.inline.bot_id
  643. for participant in (
  644. await client.get_participants(d.entity, limit=100)
  645. )
  646. ):
  647. await invite_inline_bot(client, d.entity)
  648. return d.entity, False
  649. peer = (
  650. await client(
  651. CreateChannelRequest(
  652. title,
  653. description,
  654. megagroup=not channel,
  655. )
  656. )
  657. ).chats[0]
  658. if invite_bot:
  659. await invite_inline_bot(client, peer)
  660. if silent:
  661. await dnd(client, peer, archive)
  662. elif archive:
  663. await client.edit_folder(peer, 1)
  664. if avatar:
  665. await set_avatar(client, peer, avatar)
  666. if ttl:
  667. await client(SetHistoryTTLRequest(peer=peer, period=ttl))
  668. if _folder:
  669. if _folder != "hikka":
  670. raise NotImplementedError
  671. folders = await client(GetDialogFiltersRequest())
  672. try:
  673. folder = next(folder for folder in folders if folder.title == "hikka")
  674. except Exception:
  675. folder = None
  676. if folder is not None and not any(
  677. peer.id == getattr(folder_peer, "channel_id", None)
  678. for folder_peer in folder.include_peers
  679. ):
  680. folder.include_peers += [await client.get_input_entity(peer)]
  681. await client(
  682. UpdateDialogFilterRequest(
  683. folder.id,
  684. folder,
  685. )
  686. )
  687. client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
  688. return peer, True
  689. async def dnd(
  690. client: CustomTelegramClient,
  691. peer: hints.Entity,
  692. archive: bool = True,
  693. ) -> bool:
  694. """
  695. Mutes and optionally archives peer
  696. :param peer: Anything entity-link
  697. :param archive: Archive peer, or just mute?
  698. :return: `True` on success, otherwise `False`
  699. """
  700. try:
  701. await client(
  702. UpdateNotifySettingsRequest(
  703. peer=peer,
  704. settings=InputPeerNotifySettings(
  705. show_previews=False,
  706. silent=True,
  707. mute_until=2**31 - 1,
  708. ),
  709. )
  710. )
  711. if archive:
  712. await client.edit_folder(peer, 1)
  713. except Exception:
  714. logger.exception("utils.dnd error")
  715. return False
  716. return True
  717. def get_link(user: typing.Union[User, Channel], /) -> str:
  718. """
  719. Get telegram permalink to entity
  720. :param user: User or channel
  721. :return: Link to entity
  722. """
  723. return (
  724. f"tg://user?id={user.id}"
  725. if isinstance(user, User)
  726. else (
  727. f"tg://resolve?domain={user.username}"
  728. if getattr(user, "username", None)
  729. else ""
  730. )
  731. )
  732. def chunks(_list: ListLike, n: int, /) -> typing.List[typing.List[typing.Any]]:
  733. """
  734. Split provided `_list` into chunks of `n`
  735. :param _list: List to split
  736. :param n: Chunk size
  737. :return: List of chunks
  738. """
  739. return [_list[i : i + n] for i in range(0, len(_list), n)]
  740. def get_named_platform() -> str:
  741. """
  742. Returns formatted platform name
  743. :return: Platform name
  744. """
  745. try:
  746. if os.path.isfile("/proc/device-tree/model"):
  747. with open("/proc/device-tree/model") as f:
  748. model = f.read()
  749. if "Orange" in model:
  750. return f"🍊 {model}"
  751. return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
  752. except Exception:
  753. # In case of weird fs, aka Termux
  754. pass
  755. try:
  756. from platform import uname
  757. if "microsoft-standard" in uname().release:
  758. return "🍁 WSL"
  759. except Exception:
  760. pass
  761. if "MIYAHOST" in os.environ:
  762. return "🎃 MiyaHost"
  763. if "GOORM" in os.environ:
  764. return "🦾 GoormIDE"
  765. if "RAILWAY" in os.environ:
  766. return "🚂 Railway"
  767. if "DOCKER" in os.environ:
  768. return "🐳 Docker"
  769. if "com.termux" in os.environ.get("PREFIX", ""):
  770. return "🕶 Termux"
  771. if "CODESPACES" in os.environ:
  772. return "🐈‍⬛ Codespaces"
  773. return f"✌️ lavHost {os.environ['LAVHOST']}" if "LAVHOST" in os.environ else "📻 VDS"
  774. def get_platform_emoji() -> str:
  775. """
  776. Returns custom emoji for current platform
  777. :return: Emoji entity in string
  778. """
  779. BASE = "".join(
  780. (
  781. "<emoji document_id={}>🌘</emoji>",
  782. "<emoji document_id=5195311729663286630>🌘</emoji>",
  783. "<emoji document_id=5195045669324201904>🌘</emoji>",
  784. )
  785. )
  786. if "DOCKER" in os.environ:
  787. return BASE.format(5298554256603752468)
  788. if "LAVHOST" in os.environ:
  789. return BASE.format(5301078610747074753)
  790. if "MIYAHOST" in os.environ:
  791. return BASE.format(5280938237785808014)
  792. if "GOORM" in os.environ:
  793. return BASE.format(5298947740032573902)
  794. if "CODESPACES" in os.environ:
  795. return BASE.format(5194976881127989720)
  796. if "com.termux" in os.environ.get("PREFIX", ""):
  797. return BASE.format(5193051778001673828)
  798. if "RAILWAY" in os.environ:
  799. return BASE.format(5199607521593007466)
  800. return BASE.format(5192765204898783881)
  801. def uptime() -> int:
  802. """
  803. Returns userbot uptime in seconds
  804. :return: Uptime in seconds
  805. """
  806. return round(time.perf_counter() - init_ts)
  807. def formatted_uptime() -> str:
  808. """
  809. Returnes formmated uptime
  810. :return: Formatted uptime
  811. """
  812. return f"{str(timedelta(seconds=uptime()))}"
  813. def ascii_face() -> str:
  814. """
  815. Returnes cute ASCII-art face
  816. :return: ASCII-art face
  817. """
  818. return escape_html(
  819. random.choice(
  820. [
  821. "ヽ(๑◠ܫ◠๑)ノ",
  822. "(◕ᴥ◕ʋ)",
  823. "ᕙ(`▽´)ᕗ",
  824. "(✿◠‿◠)",
  825. "(▰˘◡˘▰)",
  826. "(˵ ͡° ͜ʖ ͡°˵)",
  827. "ʕっ•ᴥ•ʔっ",
  828. "( ͡° ᴥ ͡°)",
  829. "(๑•́ ヮ •̀๑)",
  830. "٩(^‿^)۶",
  831. "(っˆڡˆς)",
  832. "ψ(`∇´)ψ",
  833. "⊙ω⊙",
  834. "٩(^ᴗ^)۶",
  835. "(´・ω・)っ由",
  836. "( ͡~ ͜ʖ ͡°)",
  837. "✧♡(◕‿◕✿)",
  838. "โ๏௰๏ใ ื",
  839. "∩。• ᵕ •。∩ ♡",
  840. "(♡´౪`♡)",
  841. "(◍>◡<◍)⋈。✧♡",
  842. "╰(✿´⌣`✿)╯♡",
  843. "ʕ•ᴥ•ʔ",
  844. "ᶘ ◕ᴥ◕ᶅ",
  845. "▼・ᴥ・▼",
  846. "ฅ^•ﻌ•^ฅ",
  847. "(΄◞ิ౪◟ิ‵)",
  848. "٩(^ᴗ^)۶",
  849. "ᕴーᴥーᕵ",
  850. "ʕ→ᴥ←ʔ",
  851. "ʕᵕᴥᵕʔ",
  852. "ʕᵒᴥᵒʔ",
  853. "ᵔᴥᵔ",
  854. "(✿╹◡╹)",
  855. "(๑→ܫ←)",
  856. "ʕ·ᴥ· ʔ",
  857. "(ノ≧ڡ≦)",
  858. "(≖ᴗ≖✿)",
  859. "(〜^∇^ )〜",
  860. "( ノ・ェ・ )ノ",
  861. "~( ˘▾˘~)",
  862. "(〜^∇^)〜",
  863. "ヽ(^ᴗ^ヽ)",
  864. "(´・ω・`)",
  865. "₍ᐢ•ﻌ•ᐢ₎*・゚。",
  866. "(。・・)_且",
  867. "(=`ω´=)",
  868. "(*•‿•*)",
  869. "(*゚∀゚*)",
  870. "(☉⋆‿⋆☉)",
  871. "ɷ◡ɷ",
  872. "ʘ‿ʘ",
  873. "(。-ω-)ノ",
  874. "( ・ω・)ノ",
  875. "(=゚ω゚)ノ",
  876. "(・ε・`*) …",
  877. "ʕっ•ᴥ•ʔっ",
  878. "(*˘︶˘*)",
  879. ]
  880. )
  881. )
  882. def array_sum(
  883. array: typing.List[typing.List[typing.Any]], /
  884. ) -> typing.List[typing.Any]:
  885. """
  886. Performs basic sum operation on array
  887. :param array: Array to sum
  888. :return: Sum of array
  889. """
  890. result = []
  891. for item in array:
  892. result += item
  893. return result
  894. def rand(size: int, /) -> str:
  895. """
  896. Return random string of len `size`
  897. :param size: Length of string
  898. :return: Random string
  899. """
  900. return "".join(
  901. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  902. )
  903. def smart_split(
  904. text: str,
  905. entities: typing.List[FormattingEntity],
  906. length: int = 4096,
  907. split_on: ListLike = ("\n", " "),
  908. min_length: int = 1,
  909. ) -> typing.Iterator[str]:
  910. """
  911. Split the message into smaller messages.
  912. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  913. The end of each message except the last one is stripped of characters from [split_on]
  914. :param text: the plain text input
  915. :param entities: the entities
  916. :param length: the maximum length of a single message
  917. :param split_on: characters (or strings) which are preferred for a message break
  918. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  919. :return: iterator, which returns strings
  920. :example:
  921. >>> utils.smart_split(
  922. *telethon.extensions.html.parse(
  923. "<b>Hello, world!</b>"
  924. )
  925. )
  926. <<< ["<b>Hello, world!</b>"]
  927. """
  928. # Authored by @bsolute
  929. # https://t.me/LonamiWebs/27777
  930. encoded = text.encode("utf-16le")
  931. pending_entities = entities
  932. text_offset = 0
  933. bytes_offset = 0
  934. text_length = len(text)
  935. bytes_length = len(encoded)
  936. while text_offset < text_length:
  937. if bytes_offset + length * 2 >= bytes_length:
  938. yield parser.unparse(
  939. text[text_offset:],
  940. list(sorted(pending_entities, key=lambda x: x.offset)),
  941. )
  942. break
  943. codepoint_count = len(
  944. encoded[bytes_offset : bytes_offset + length * 2].decode(
  945. "utf-16le",
  946. errors="ignore",
  947. )
  948. )
  949. for search in split_on:
  950. search_index = text.rfind(
  951. search,
  952. text_offset + min_length,
  953. text_offset + codepoint_count,
  954. )
  955. if search_index != -1:
  956. break
  957. else:
  958. search_index = text_offset + codepoint_count
  959. split_index = grapheme.safe_split_index(text, search_index)
  960. split_offset_utf16 = (
  961. len(text[text_offset:split_index].encode("utf-16le"))
  962. ) // 2
  963. exclude = 0
  964. while (
  965. split_index + exclude < text_length
  966. and text[split_index + exclude] in split_on
  967. ):
  968. exclude += 1
  969. current_entities = []
  970. entities = pending_entities.copy()
  971. pending_entities = []
  972. for entity in entities:
  973. if (
  974. entity.offset < split_offset_utf16
  975. and entity.offset + entity.length > split_offset_utf16 + exclude
  976. ):
  977. # spans boundary
  978. current_entities.append(
  979. _copy_tl(
  980. entity,
  981. length=split_offset_utf16 - entity.offset,
  982. )
  983. )
  984. pending_entities.append(
  985. _copy_tl(
  986. entity,
  987. offset=0,
  988. length=entity.offset
  989. + entity.length
  990. - split_offset_utf16
  991. - exclude,
  992. )
  993. )
  994. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  995. # overlaps boundary
  996. current_entities.append(
  997. _copy_tl(
  998. entity,
  999. length=split_offset_utf16 - entity.offset,
  1000. )
  1001. )
  1002. elif entity.offset < split_offset_utf16:
  1003. # wholly left
  1004. current_entities.append(entity)
  1005. elif (
  1006. entity.offset + entity.length
  1007. > split_offset_utf16 + exclude
  1008. > entity.offset
  1009. ):
  1010. # overlaps right boundary
  1011. pending_entities.append(
  1012. _copy_tl(
  1013. entity,
  1014. offset=0,
  1015. length=entity.offset
  1016. + entity.length
  1017. - split_offset_utf16
  1018. - exclude,
  1019. )
  1020. )
  1021. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  1022. # wholly right
  1023. pending_entities.append(
  1024. _copy_tl(
  1025. entity,
  1026. offset=entity.offset - split_offset_utf16 - exclude,
  1027. )
  1028. )
  1029. current_text = text[text_offset:split_index]
  1030. yield parser.unparse(
  1031. current_text,
  1032. list(sorted(current_entities, key=lambda x: x.offset)),
  1033. )
  1034. text_offset = split_index + exclude
  1035. bytes_offset += len(current_text.encode("utf-16le"))
  1036. def _copy_tl(o, **kwargs):
  1037. d = o.to_dict()
  1038. del d["_"]
  1039. d.update(kwargs)
  1040. return o.__class__(**d)
  1041. def check_url(url: str) -> bool:
  1042. """
  1043. Statically checks url for validity
  1044. :param url: URL to check
  1045. :return: True if valid, False otherwise
  1046. """
  1047. try:
  1048. return bool(urlparse(url).netloc)
  1049. except Exception:
  1050. return False
  1051. def get_git_hash() -> typing.Union[str, bool]:
  1052. """
  1053. Get current Hikka git hash
  1054. :return: Git commit hash
  1055. """
  1056. try:
  1057. return git.Repo().head.commit.hexsha
  1058. except Exception:
  1059. return False
  1060. def get_commit_url() -> str:
  1061. """
  1062. Get current Hikka git commit url
  1063. :return: Git commit url
  1064. """
  1065. try:
  1066. hash_ = get_git_hash()
  1067. return (
  1068. f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
  1069. )
  1070. except Exception:
  1071. return "Unknown"
  1072. def is_serializable(x: typing.Any, /) -> bool:
  1073. """
  1074. Checks if object is JSON-serializable
  1075. :param x: Object to check
  1076. :return: True if object is JSON-serializable, False otherwise
  1077. """
  1078. try:
  1079. json.dumps(x)
  1080. return True
  1081. except Exception:
  1082. return False
  1083. def get_lang_flag(countrycode: str) -> str:
  1084. """
  1085. Gets an emoji of specified countrycode
  1086. :param countrycode: 2-letter countrycode
  1087. :return: Emoji flag
  1088. """
  1089. if (
  1090. len(
  1091. code := [
  1092. c
  1093. for c in countrycode.lower()
  1094. if c in string.ascii_letters + string.digits
  1095. ]
  1096. )
  1097. == 2
  1098. ):
  1099. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  1100. return countrycode
  1101. def get_entity_url(
  1102. entity: typing.Union[User, Channel],
  1103. openmessage: bool = False,
  1104. ) -> str:
  1105. """
  1106. Get link to object, if available
  1107. :param entity: Entity to get url of
  1108. :param openmessage: Use tg://openmessage link for users
  1109. :return: Link to object or empty string
  1110. """
  1111. return (
  1112. (
  1113. f"tg://openmessage?id={entity.id}"
  1114. if openmessage
  1115. else f"tg://user?id={entity.id}"
  1116. )
  1117. if isinstance(entity, User)
  1118. else (
  1119. f"tg://resolve?domain={entity.username}"
  1120. if getattr(entity, "username", None)
  1121. else ""
  1122. )
  1123. )
  1124. async def get_message_link(
  1125. message: Message,
  1126. chat: typing.Optional[typing.Union[Chat, Channel]] = None,
  1127. ) -> str:
  1128. """
  1129. Get link to message
  1130. :param message: Message to get link of
  1131. :param chat: Chat, where message was sent
  1132. :return: Link to message
  1133. """
  1134. if message.is_private:
  1135. return (
  1136. f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
  1137. )
  1138. if not chat and not (chat := message.chat):
  1139. chat = await message.get_chat()
  1140. topic_affix = (
  1141. f"?topic={message.reply_to.reply_to_msg_id}"
  1142. if getattr(message.reply_to, "forum_topic", False)
  1143. else ""
  1144. )
  1145. return (
  1146. f"https://t.me/{chat.username}/{message.id}{topic_affix}"
  1147. if getattr(chat, "username", False)
  1148. else f"https://t.me/c/{chat.id}/{message.id}{topic_affix}"
  1149. )
  1150. def remove_html(text: str, escape: bool = False, keep_emojis: bool = False) -> str:
  1151. """
  1152. Removes HTML tags from text
  1153. :param text: Text to remove HTML from
  1154. :param escape: Escape HTML
  1155. :param keep_emojis: Keep custom emojis
  1156. :return: Text without HTML
  1157. """
  1158. return (escape_html if escape else str)(
  1159. re.sub(
  1160. r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>)"
  1161. if keep_emojis
  1162. else r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)",
  1163. "",
  1164. text,
  1165. )
  1166. )
  1167. def get_kwargs() -> typing.Dict[str, typing.Any]:
  1168. """
  1169. Get kwargs of function, in which is called
  1170. :return: kwargs
  1171. """
  1172. # https://stackoverflow.com/a/65927265/19170642
  1173. keys, _, _, values = inspect.getargvalues(inspect.currentframe().f_back)
  1174. return {key: values[key] for key in keys if key != "self"}
  1175. def mime_type(message: Message) -> str:
  1176. """
  1177. Get mime type of document in message
  1178. :param message: Message with document
  1179. :return: Mime type or empty string if not present
  1180. """
  1181. return (
  1182. ""
  1183. if not isinstance(message, Message) or not getattr(message, "media", False)
  1184. else getattr(getattr(message, "media", False), "mime_type", False) or ""
  1185. )
  1186. def find_caller(
  1187. stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
  1188. ) -> typing.Any:
  1189. """
  1190. Attempts to find command in stack
  1191. :param stack: Stack to search in
  1192. :return: Command-caller or None
  1193. """
  1194. caller = next(
  1195. (
  1196. frame_info
  1197. for frame_info in stack or inspect.stack()
  1198. if hasattr(frame_info, "function")
  1199. and any(
  1200. inspect.isclass(cls_)
  1201. and issubclass(cls_, Module)
  1202. and cls_ is not Module
  1203. for cls_ in frame_info.frame.f_globals.values()
  1204. )
  1205. ),
  1206. None,
  1207. )
  1208. if not caller:
  1209. return next(
  1210. (
  1211. frame_info.frame.f_locals["func"]
  1212. for frame_info in stack or inspect.stack()
  1213. if hasattr(frame_info, "function")
  1214. and frame_info.function == "future_dispatcher"
  1215. and (
  1216. "CommandDispatcher"
  1217. in getattr(getattr(frame_info, "frame", None), "f_globals", {})
  1218. )
  1219. ),
  1220. None,
  1221. )
  1222. return next(
  1223. (
  1224. getattr(cls_, caller.function, None)
  1225. for cls_ in caller.frame.f_globals.values()
  1226. if inspect.isclass(cls_) and issubclass(cls_, Module)
  1227. ),
  1228. None,
  1229. )
  1230. def validate_html(html: str) -> str:
  1231. """
  1232. Removes broken tags from html
  1233. :param html: HTML to validate
  1234. :return: Valid HTML
  1235. """
  1236. text, entities = telethon.extensions.html.parse(html)
  1237. return telethon.extensions.html.unparse(escape_html(text), entities)
  1238. def iter_attrs(obj: typing.Any, /) -> typing.List[typing.Tuple[str, typing.Any]]:
  1239. """
  1240. Returns list of attributes of object
  1241. :param obj: Object to iterate over
  1242. :return: List of attributes and their values
  1243. """
  1244. return ((attr, getattr(obj, attr)) for attr in dir(obj))
  1245. def atexit(
  1246. func: typing.Callable,
  1247. use_signal: typing.Optional[int] = None,
  1248. *args,
  1249. **kwargs,
  1250. ) -> None:
  1251. """
  1252. Calls function on exit
  1253. :param func: Function to call
  1254. :param use_signal: If passed, `signal` will be used instead of `atexit`
  1255. :param args: Arguments to pass to function
  1256. :param kwargs: Keyword arguments to pass to function
  1257. :return: None
  1258. """
  1259. if use_signal:
  1260. signal.signal(use_signal, lambda *_: func(*args, **kwargs))
  1261. return
  1262. _atexit.register(functools.partial(func, *args, **kwargs))
  1263. def get_topic(message: Message) -> typing.Optional[int]:
  1264. """
  1265. Get topic id of message
  1266. :param message: Message to get topic of
  1267. :return: int or None if not present
  1268. """
  1269. return (
  1270. (message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id)
  1271. if (
  1272. isinstance(message, Message)
  1273. and message.reply_to
  1274. and message.reply_to.forum_topic
  1275. )
  1276. else message.form["top_msg_id"]
  1277. if isinstance(message, (InlineCall, InlineMessage))
  1278. else None
  1279. )
  1280. def get_ram_usage() -> float:
  1281. """Returns current process tree memory usage in MB"""
  1282. try:
  1283. import psutil
  1284. current_process = psutil.Process(os.getpid())
  1285. mem = current_process.memory_info()[0] / 2.0**20
  1286. for child in current_process.children(recursive=True):
  1287. mem += child.memory_info()[0] / 2.0**20
  1288. return round(mem, 1)
  1289. except Exception:
  1290. return 0
  1291. def get_cpu_usage() -> float:
  1292. """Returns current process tree CPU usage in %"""
  1293. try:
  1294. import psutil
  1295. current_process = psutil.Process(os.getpid())
  1296. cpu = current_process.cpu_percent()
  1297. for child in current_process.children(recursive=True):
  1298. cpu += child.cpu_percent()
  1299. return round(cpu, 1)
  1300. except Exception:
  1301. return 0
  1302. init_ts = time.perf_counter()
  1303. # GeekTG Compatibility
  1304. def get_git_info() -> typing.Tuple[str, str]:
  1305. """
  1306. Get git info
  1307. :return: Git info
  1308. """
  1309. hash_ = get_git_hash()
  1310. return (
  1311. hash_,
  1312. f"https://github.com/hikariatama/Hikka/commit/{hash_}" if hash_ else "",
  1313. )
  1314. def get_version_raw() -> str:
  1315. """
  1316. Get the version of the userbot
  1317. :return: Version in format %s.%s.%s
  1318. """
  1319. from . import version
  1320. return ".".join(map(str, list(version.__version__)))
  1321. get_platform_name = get_named_platform