utils.py 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567
  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2023
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import asyncio
  20. import atexit as _atexit
  21. import contextlib
  22. import functools
  23. import inspect
  24. import io
  25. import json
  26. import logging
  27. import os
  28. import random
  29. import re
  30. import shlex
  31. import signal
  32. import string
  33. import time
  34. import typing
  35. from datetime import timedelta
  36. from urllib.parse import urlparse
  37. import git
  38. import grapheme
  39. import hikkatl
  40. import requests
  41. from aiogram.types import Message as AiogramMessage
  42. from hikkatl import hints
  43. from hikkatl.tl.custom.message import Message
  44. from hikkatl.tl.functions.account import UpdateNotifySettingsRequest
  45. from hikkatl.tl.functions.channels import (
  46. CreateChannelRequest,
  47. EditAdminRequest,
  48. EditPhotoRequest,
  49. InviteToChannelRequest,
  50. )
  51. from hikkatl.tl.functions.messages import (
  52. GetDialogFiltersRequest,
  53. SetHistoryTTLRequest,
  54. UpdateDialogFilterRequest,
  55. )
  56. from hikkatl.tl.types import (
  57. Channel,
  58. Chat,
  59. ChatAdminRights,
  60. InputDocument,
  61. InputPeerNotifySettings,
  62. MessageEntityBankCard,
  63. MessageEntityBlockquote,
  64. MessageEntityBold,
  65. MessageEntityBotCommand,
  66. MessageEntityCashtag,
  67. MessageEntityCode,
  68. MessageEntityEmail,
  69. MessageEntityHashtag,
  70. MessageEntityItalic,
  71. MessageEntityMention,
  72. MessageEntityMentionName,
  73. MessageEntityPhone,
  74. MessageEntityPre,
  75. MessageEntitySpoiler,
  76. MessageEntityStrike,
  77. MessageEntityTextUrl,
  78. MessageEntityUnderline,
  79. MessageEntityUnknown,
  80. MessageEntityUrl,
  81. MessageMediaWebPage,
  82. PeerChannel,
  83. PeerChat,
  84. PeerUser,
  85. UpdateNewChannelMessage,
  86. User,
  87. )
  88. from ._internal import fw_protect
  89. from .inline.types import InlineCall, InlineMessage
  90. from .tl_cache import CustomTelegramClient
  91. from .types import HikkaReplyMarkup, ListLike, Module
  92. FormattingEntity = typing.Union[
  93. MessageEntityUnknown,
  94. MessageEntityMention,
  95. MessageEntityHashtag,
  96. MessageEntityBotCommand,
  97. MessageEntityUrl,
  98. MessageEntityEmail,
  99. MessageEntityBold,
  100. MessageEntityItalic,
  101. MessageEntityCode,
  102. MessageEntityPre,
  103. MessageEntityTextUrl,
  104. MessageEntityMentionName,
  105. MessageEntityPhone,
  106. MessageEntityCashtag,
  107. MessageEntityUnderline,
  108. MessageEntityStrike,
  109. MessageEntityBlockquote,
  110. MessageEntityBankCard,
  111. MessageEntitySpoiler,
  112. ]
  113. emoji_pattern = re.compile(
  114. "["
  115. "\U0001f600-\U0001f64f" # emoticons
  116. "\U0001f300-\U0001f5ff" # symbols & pictographs
  117. "\U0001f680-\U0001f6ff" # transport & map symbols
  118. "\U0001f1e0-\U0001f1ff" # flags (iOS)
  119. "]+",
  120. flags=re.UNICODE,
  121. )
  122. parser = hikkatl.utils.sanitize_parse_mode("html")
  123. logger = logging.getLogger(__name__)
  124. def get_args(message: typing.Union[Message, str]) -> typing.List[str]:
  125. """
  126. Get arguments from message
  127. :param message: Message or string to get arguments from
  128. :return: List of arguments
  129. """
  130. if not (message := getattr(message, "message", message)):
  131. return False
  132. if len(message := message.split(maxsplit=1)) <= 1:
  133. return []
  134. message = message[1]
  135. try:
  136. split = shlex.split(message)
  137. except ValueError:
  138. return message # Cannot split, let's assume that it's just one long message
  139. return list(filter(lambda x: len(x) > 0, split))
  140. def get_args_raw(message: typing.Union[Message, str]) -> str:
  141. """
  142. Get the parameters to the command as a raw string (not split)
  143. :param message: Message or string to get arguments from
  144. :return: Raw string of arguments
  145. """
  146. if not (message := getattr(message, "message", message)):
  147. return False
  148. return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
  149. def get_args_html(message: Message) -> str:
  150. """
  151. Get the parameters to the command as string with HTML (not split)
  152. :param message: Message to get arguments from
  153. :return: String with HTML arguments
  154. """
  155. prefix = message.client.loader.get_prefix()
  156. if not (message := message.text):
  157. return False
  158. if prefix not in message:
  159. return message
  160. raw_text, entities = parser.parse(message)
  161. raw_text = parser._add_surrogate(raw_text)
  162. try:
  163. command = raw_text[
  164. raw_text.index(prefix) : raw_text.index(" ", raw_text.index(prefix) + 1)
  165. ]
  166. except ValueError:
  167. return ""
  168. command_len = len(command) + 1
  169. return parser.unparse(
  170. parser._del_surrogate(raw_text[command_len:]),
  171. relocate_entities(entities, -command_len, raw_text[command_len:]),
  172. )
  173. def get_args_split_by(
  174. message: typing.Union[Message, str],
  175. separator: str,
  176. ) -> typing.List[str]:
  177. """
  178. Split args with a specific separator
  179. :param message: Message or string to get arguments from
  180. :param separator: Separator to split by
  181. :return: List of arguments
  182. """
  183. return [
  184. section.strip() for section in get_args_raw(message).split(separator) if section
  185. ]
  186. def get_chat_id(message: typing.Union[Message, AiogramMessage]) -> int:
  187. """
  188. Get the chat ID, but without -100 if its a channel
  189. :param message: Message to get chat ID from
  190. :return: Chat ID
  191. """
  192. return hikkatl.utils.resolve_id(
  193. getattr(message, "chat_id", None)
  194. or getattr(getattr(message, "chat", None), "id", None)
  195. )[0]
  196. def get_entity_id(entity: hints.Entity) -> int:
  197. """
  198. Get entity ID
  199. :param entity: Entity to get ID from
  200. :return: Entity ID
  201. """
  202. return hikkatl.utils.get_peer_id(entity)
  203. def escape_html(text: str, /) -> str: # sourcery skip
  204. """
  205. Pass all untrusted/potentially corrupt input here
  206. :param text: Text to escape
  207. :return: Escaped text
  208. """
  209. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  210. def escape_quotes(text: str, /) -> str:
  211. """
  212. Escape quotes to html quotes
  213. :param text: Text to escape
  214. :return: Escaped text
  215. """
  216. return escape_html(text).replace('"', "&quot;")
  217. def get_base_dir() -> str:
  218. """
  219. Get directory of this file
  220. :return: Directory of this file
  221. """
  222. return get_dir(__file__)
  223. def get_dir(mod: str) -> str:
  224. """
  225. Get directory of given module
  226. :param mod: Module's `__file__` to get directory of
  227. :return: Directory of given module
  228. """
  229. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  230. async def get_user(message: Message) -> typing.Optional[User]:
  231. """
  232. Get user who sent message, searching if not found easily
  233. :param message: Message to get user from
  234. :return: User who sent message
  235. """
  236. try:
  237. return await message.get_sender()
  238. except ValueError: # Not in database. Lets go looking for them.
  239. logger.debug("User not in session cache. Searching...")
  240. if isinstance(message.peer_id, PeerUser):
  241. await message.client.get_dialogs()
  242. return await message.get_sender()
  243. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  244. async for user in message.client.iter_participants(
  245. message.peer_id,
  246. aggressive=True,
  247. ):
  248. if user.id == message.sender_id:
  249. return user
  250. logger.error("User isn't in the group where they sent the message")
  251. return None
  252. logger.error("`peer_id` is not a user, chat or channel")
  253. return None
  254. def run_sync(func, *args, **kwargs):
  255. """
  256. Run a non-async function in a new thread and return an awaitable
  257. :param func: Sync-only function to execute
  258. :return: Awaitable coroutine
  259. """
  260. return asyncio.get_event_loop().run_in_executor(
  261. None,
  262. functools.partial(func, *args, **kwargs),
  263. )
  264. def run_async(loop: asyncio.AbstractEventLoop, coro: typing.Awaitable) -> typing.Any:
  265. """
  266. Run an async function as a non-async function, blocking till it's done
  267. :param loop: Event loop to run the coroutine in
  268. :param coro: Coroutine to run
  269. :return: Result of the coroutine
  270. """
  271. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  272. def censor(
  273. obj: typing.Any,
  274. to_censor: typing.Optional[typing.Iterable[str]] = None,
  275. replace_with: str = "redacted_{count}_chars",
  276. ):
  277. """
  278. May modify the original object, but don't rely on it
  279. :param obj: Object to censor, preferrably telethon
  280. :param to_censor: Iterable of strings to censor
  281. :param replace_with: String to replace with, {count} will be replaced with the number of characters
  282. :return: Censored object
  283. """
  284. if to_censor is None:
  285. to_censor = ["phone"]
  286. for k, v in vars(obj).items():
  287. if k in to_censor:
  288. setattr(obj, k, replace_with.format(count=len(v)))
  289. elif k[0] != "_" and hasattr(v, "__dict__"):
  290. setattr(obj, k, censor(v, to_censor, replace_with))
  291. return obj
  292. def relocate_entities(
  293. entities: typing.List[FormattingEntity],
  294. offset: int,
  295. text: typing.Optional[str] = None,
  296. ) -> typing.List[FormattingEntity]:
  297. """
  298. Move all entities by offset (truncating at text)
  299. :param entities: List of entities
  300. :param offset: Offset to move by
  301. :param text: Text to truncate at
  302. :return: List of entities
  303. """
  304. length = len(text) if text is not None else 0
  305. for ent in entities.copy() if entities else ():
  306. ent.offset += offset
  307. if ent.offset < 0:
  308. ent.length += ent.offset
  309. ent.offset = 0
  310. if text is not None and ent.offset + ent.length > length:
  311. ent.length = length - ent.offset
  312. if ent.length <= 0:
  313. entities.remove(ent)
  314. return entities
  315. async def answer_file(
  316. message: typing.Union[Message, InlineCall, InlineMessage],
  317. file: typing.Union[str, bytes, io.IOBase, InputDocument],
  318. caption: typing.Optional[str] = None,
  319. **kwargs,
  320. ):
  321. """
  322. Use this to answer a message with a document
  323. :param message: Message to answer
  324. :param file: File to send - url, path or bytes
  325. :param caption: Caption to send
  326. :param kwargs: Extra kwargs to pass to `send_file`
  327. :return: Sent message
  328. :example:
  329. >>> await utils.answer_file(message, "test.txt")
  330. >>> await utils.answer_file(
  331. message,
  332. "https://mods.hikariatama.ru/badges/artai.jpg",
  333. "This is the cool module, check it out!",
  334. )
  335. """
  336. if isinstance(message, (InlineCall, InlineMessage)):
  337. message = message.form["caller"]
  338. if topic := get_topic(message):
  339. kwargs.setdefault("reply_to", topic)
  340. try:
  341. response = await message.client.send_file(
  342. message.peer_id,
  343. file,
  344. caption=caption,
  345. **kwargs,
  346. )
  347. except Exception:
  348. if caption:
  349. logger.warning(
  350. "Failed to send file, sending plain text instead", exc_info=True
  351. )
  352. return await answer(message, caption, **kwargs)
  353. raise
  354. with contextlib.suppress(Exception):
  355. await message.delete()
  356. return response
  357. async def answer(
  358. message: typing.Union[Message, InlineCall, InlineMessage],
  359. response: str,
  360. *,
  361. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  362. **kwargs,
  363. ) -> typing.Union[InlineCall, InlineMessage, Message]:
  364. """
  365. Use this to give the response to a command
  366. :param message: Message to answer to. Can be a tl message or hikka inline object
  367. :param response: Response to send
  368. :param reply_markup: Reply markup to send. If specified, inline form will be used
  369. :return: Message or inline object
  370. :example:
  371. >>> await utils.answer(message, "Hello world!")
  372. >>> await utils.answer(
  373. message,
  374. "https://some-url.com/photo.jpg",
  375. caption="Hello, this is your photo!",
  376. asfile=True,
  377. )
  378. >>> await utils.answer(
  379. message,
  380. "Hello world!",
  381. reply_markup={"text": "Hello!", "data": "world"},
  382. silent=True,
  383. disable_security=True,
  384. )
  385. """
  386. # Compatibility with FTG\GeekTG
  387. if isinstance(message, list) and message:
  388. message = message[0]
  389. if reply_markup is not None:
  390. if not isinstance(reply_markup, (list, dict)):
  391. raise ValueError("reply_markup must be a list or dict")
  392. if reply_markup:
  393. kwargs.pop("message", None)
  394. if isinstance(message, (InlineMessage, InlineCall)):
  395. await message.edit(response, reply_markup, **kwargs)
  396. return
  397. reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
  398. result = await message.client.loader.inline.form(
  399. response,
  400. message=message if message.out else get_chat_id(message),
  401. reply_markup=reply_markup,
  402. **kwargs,
  403. )
  404. return result
  405. if isinstance(message, (InlineMessage, InlineCall)):
  406. await message.edit(response)
  407. return message
  408. kwargs.setdefault("link_preview", False)
  409. if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
  410. kwargs.setdefault(
  411. "reply_to",
  412. getattr(message, "reply_to_msg_id", None),
  413. )
  414. elif "reply_to" in kwargs:
  415. kwargs.pop("reply_to")
  416. parse_mode = hikkatl.utils.sanitize_parse_mode(
  417. kwargs.pop(
  418. "parse_mode",
  419. message.client.parse_mode,
  420. )
  421. )
  422. if isinstance(response, str) and not kwargs.pop("asfile", False):
  423. text, entities = parse_mode.parse(response)
  424. if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
  425. try:
  426. if not message.client.loader.inline.init_complete:
  427. raise
  428. strings = list(smart_split(text, entities, 4096))
  429. if len(strings) > 10:
  430. raise
  431. list_ = await message.client.loader.inline.list(
  432. message=message,
  433. strings=strings,
  434. )
  435. if not list_:
  436. raise
  437. return list_
  438. except Exception:
  439. file = io.BytesIO(text.encode("utf-8"))
  440. file.name = "command_result.txt"
  441. result = await message.client.send_file(
  442. message.peer_id,
  443. file,
  444. caption=message.client.loader.lookup("translations").strings(
  445. "too_long"
  446. ),
  447. reply_to=kwargs.get("reply_to") or get_topic(message),
  448. )
  449. if message.out:
  450. await message.delete()
  451. return result
  452. result = await (message.edit if edit else message.respond)(
  453. text,
  454. parse_mode=lambda t: (t, entities),
  455. **kwargs,
  456. )
  457. elif isinstance(response, Message):
  458. if message.media is None and (
  459. response.media is None or isinstance(response.media, MessageMediaWebPage)
  460. ):
  461. result = await message.edit(
  462. response.message,
  463. parse_mode=lambda t: (t, response.entities or []),
  464. link_preview=isinstance(response.media, MessageMediaWebPage),
  465. )
  466. else:
  467. result = await message.respond(response, **kwargs)
  468. else:
  469. if isinstance(response, bytes):
  470. response = io.BytesIO(response)
  471. elif isinstance(response, str):
  472. response = io.BytesIO(response.encode("utf-8"))
  473. if name := kwargs.pop("filename", None):
  474. response.name = name
  475. if message.media is not None and edit:
  476. await message.edit(file=response, **kwargs)
  477. else:
  478. kwargs.setdefault(
  479. "reply_to",
  480. getattr(message, "reply_to_msg_id", get_topic(message)),
  481. )
  482. result = await message.client.send_file(message.peer_id, response, **kwargs)
  483. if message.out:
  484. await message.delete()
  485. return result
  486. async def get_target(message: Message, arg_no: int = 0) -> typing.Optional[int]:
  487. """
  488. Get target from message
  489. :param message: Message to get target from
  490. :param arg_no: Argument number to get target from
  491. :return: Target
  492. """
  493. if any(
  494. isinstance(entity, MessageEntityMentionName)
  495. for entity in (message.entities or [])
  496. ):
  497. e = sorted(
  498. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  499. key=lambda x: x.offset,
  500. )[0]
  501. return e.user_id
  502. if len(get_args(message)) > arg_no:
  503. user = get_args(message)[arg_no]
  504. elif message.is_reply:
  505. return (await message.get_reply_message()).sender_id
  506. elif hasattr(message.peer_id, "user_id"):
  507. user = message.peer_id.user_id
  508. else:
  509. return None
  510. try:
  511. entity = await message.client.get_entity(user)
  512. except ValueError:
  513. return None
  514. else:
  515. if isinstance(entity, User):
  516. return entity.id
  517. def merge(a: dict, b: dict, /) -> dict:
  518. """
  519. Merge with replace dictionary a to dictionary b
  520. :param a: Dictionary to merge
  521. :param b: Dictionary to merge to
  522. :return: Merged dictionary
  523. """
  524. for key in a:
  525. if key in b:
  526. if isinstance(a[key], dict) and isinstance(b[key], dict):
  527. b[key] = merge(a[key], b[key])
  528. elif isinstance(a[key], list) and isinstance(b[key], list):
  529. b[key] = list(set(b[key] + a[key]))
  530. else:
  531. b[key] = a[key]
  532. b[key] = a[key]
  533. return b
  534. async def set_avatar(
  535. client: CustomTelegramClient,
  536. peer: hints.Entity,
  537. avatar: str,
  538. ) -> bool:
  539. """
  540. Sets an entity avatar
  541. :param client: Client to use
  542. :param peer: Peer to set avatar to
  543. :param avatar: Avatar to set
  544. :return: True if avatar was set, False otherwise
  545. """
  546. if isinstance(avatar, str) and check_url(avatar):
  547. f = (
  548. await run_sync(
  549. requests.get,
  550. avatar,
  551. )
  552. ).content
  553. elif isinstance(avatar, bytes):
  554. f = avatar
  555. else:
  556. return False
  557. await fw_protect()
  558. res = await client(
  559. EditPhotoRequest(
  560. channel=peer,
  561. photo=await client.upload_file(f, file_name="photo.png"),
  562. )
  563. )
  564. await fw_protect()
  565. try:
  566. await client.delete_messages(
  567. peer,
  568. message_ids=[
  569. next(
  570. update
  571. for update in res.updates
  572. if isinstance(update, UpdateNewChannelMessage)
  573. ).message.id
  574. ],
  575. )
  576. except Exception:
  577. pass
  578. return True
  579. async def invite_inline_bot(
  580. client: CustomTelegramClient,
  581. peer: hints.EntityLike,
  582. ) -> None:
  583. """
  584. Invites inline bot to a chat
  585. :param client: Client to use
  586. :param peer: Peer to invite bot to
  587. :return: None
  588. :raise RuntimeError: If error occurred while inviting bot
  589. """
  590. try:
  591. await client(InviteToChannelRequest(peer, [client.loader.inline.bot_username]))
  592. except Exception as e:
  593. raise RuntimeError(
  594. "Can't invite inline bot to old asset chat, which is required by module"
  595. ) from e
  596. with contextlib.suppress(Exception):
  597. await client(
  598. EditAdminRequest(
  599. channel=peer,
  600. user_id=client.loader.inline.bot_username,
  601. admin_rights=ChatAdminRights(ban_users=True),
  602. rank="Hikka",
  603. )
  604. )
  605. async def asset_channel(
  606. client: CustomTelegramClient,
  607. title: str,
  608. description: str,
  609. *,
  610. channel: bool = False,
  611. silent: bool = False,
  612. archive: bool = False,
  613. invite_bot: bool = False,
  614. avatar: typing.Optional[str] = None,
  615. ttl: typing.Optional[int] = None,
  616. _folder: typing.Optional[str] = None,
  617. ) -> typing.Tuple[Channel, bool]:
  618. """
  619. Create new channel (if needed) and return its entity
  620. :param client: Telegram client to create channel by
  621. :param title: Channel title
  622. :param description: Description
  623. :param channel: Whether to create a channel or supergroup
  624. :param silent: Automatically mute channel
  625. :param archive: Automatically archive channel
  626. :param invite_bot: Add inline bot and assure it's in chat
  627. :param avatar: Url to an avatar to set as pfp of created peer
  628. :param ttl: Time to live for messages in channel
  629. :return: Peer and bool: is channel new or pre-existent
  630. """
  631. if not hasattr(client, "_channels_cache"):
  632. client._channels_cache = {}
  633. if (
  634. title in client._channels_cache
  635. and client._channels_cache[title]["exp"] > time.time()
  636. ):
  637. return client._channels_cache[title]["peer"], False
  638. async for d in client.iter_dialogs():
  639. if d.title == title:
  640. client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
  641. if invite_bot:
  642. if all(
  643. participant.id != client.loader.inline.bot_id
  644. for participant in (
  645. await client.get_participants(d.entity, limit=100)
  646. )
  647. ):
  648. await fw_protect()
  649. await invite_inline_bot(client, d.entity)
  650. return d.entity, False
  651. await fw_protect()
  652. peer = (
  653. await client(
  654. CreateChannelRequest(
  655. title,
  656. description,
  657. megagroup=not channel,
  658. )
  659. )
  660. ).chats[0]
  661. if invite_bot:
  662. await fw_protect()
  663. await invite_inline_bot(client, peer)
  664. if silent:
  665. await fw_protect()
  666. await dnd(client, peer, archive)
  667. elif archive:
  668. await fw_protect()
  669. await client.edit_folder(peer, 1)
  670. if avatar:
  671. await fw_protect()
  672. await set_avatar(client, peer, avatar)
  673. if ttl:
  674. await fw_protect()
  675. await client(SetHistoryTTLRequest(peer=peer, period=ttl))
  676. if _folder:
  677. if _folder != "hikka":
  678. raise NotImplementedError
  679. folders = await client(GetDialogFiltersRequest())
  680. try:
  681. folder = next(folder for folder in folders if folder.title == "hikka")
  682. except Exception:
  683. folder = None
  684. if folder is not None and not any(
  685. peer.id == getattr(folder_peer, "channel_id", None)
  686. for folder_peer in folder.include_peers
  687. ):
  688. folder.include_peers += [await client.get_input_entity(peer)]
  689. await client(
  690. UpdateDialogFilterRequest(
  691. folder.id,
  692. folder,
  693. )
  694. )
  695. client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
  696. return peer, True
  697. async def dnd(
  698. client: CustomTelegramClient,
  699. peer: hints.Entity,
  700. archive: bool = True,
  701. ) -> bool:
  702. """
  703. Mutes and optionally archives peer
  704. :param peer: Anything entity-link
  705. :param archive: Archive peer, or just mute?
  706. :return: `True` on success, otherwise `False`
  707. """
  708. try:
  709. await client(
  710. UpdateNotifySettingsRequest(
  711. peer=peer,
  712. settings=InputPeerNotifySettings(
  713. show_previews=False,
  714. silent=True,
  715. mute_until=2**31 - 1,
  716. ),
  717. )
  718. )
  719. if archive:
  720. await fw_protect()
  721. await client.edit_folder(peer, 1)
  722. except Exception:
  723. logger.exception("utils.dnd error")
  724. return False
  725. return True
  726. def get_link(user: typing.Union[User, Channel], /) -> str:
  727. """
  728. Get telegram permalink to entity
  729. :param user: User or channel
  730. :return: Link to entity
  731. """
  732. return (
  733. f"tg://user?id={user.id}"
  734. if isinstance(user, User)
  735. else (
  736. f"tg://resolve?domain={user.username}"
  737. if getattr(user, "username", None)
  738. else ""
  739. )
  740. )
  741. def chunks(_list: ListLike, n: int, /) -> typing.List[typing.List[typing.Any]]:
  742. """
  743. Split provided `_list` into chunks of `n`
  744. :param _list: List to split
  745. :param n: Chunk size
  746. :return: List of chunks
  747. """
  748. return [_list[i : i + n] for i in range(0, len(_list), n)]
  749. def get_named_platform() -> str:
  750. """
  751. Returns formatted platform name
  752. :return: Platform name
  753. """
  754. from . import main
  755. with contextlib.suppress(Exception):
  756. if os.path.isfile("/proc/device-tree/model"):
  757. with open("/proc/device-tree/model") as f:
  758. model = f.read()
  759. if "Orange" in model:
  760. return f"🍊 {model}"
  761. return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
  762. if main.IS_WSL:
  763. return "🍀 WSL"
  764. if main.IS_GOORM:
  765. return "🦾 GoormIDE"
  766. if main.IS_RAILWAY:
  767. return "🚂 Railway"
  768. if main.IS_DOCKER:
  769. return "🐳 Docker"
  770. if main.IS_TERMUX:
  771. return "🕶 Termux"
  772. if main.IS_CODESPACES:
  773. return "🐈‍⬛ Codespaces"
  774. return f"✌️ lavHost {os.environ['LAVHOST']}" if main.IS_LAVHOST else "📻 VDS"
  775. def get_platform_emoji() -> str:
  776. """
  777. Returns custom emoji for current platform
  778. :return: Emoji entity in string
  779. """
  780. from . import main
  781. BASE = "".join(
  782. (
  783. "<emoji document_id={}>🌘</emoji>",
  784. "<emoji document_id=5195311729663286630>🌘</emoji>",
  785. "<emoji document_id=5195045669324201904>🌘</emoji>",
  786. )
  787. )
  788. if main.IS_DOCKER:
  789. return BASE.format(5298554256603752468)
  790. if main.IS_LAVHOST:
  791. return BASE.format(5301078610747074753)
  792. if main.IS_GOORM:
  793. return BASE.format(5298947740032573902)
  794. if main.IS_CODESPACES:
  795. return BASE.format(5194976881127989720)
  796. if main.IS_TERMUX:
  797. return BASE.format(5193051778001673828)
  798. if main.IS_RAILWAY:
  799. return BASE.format(5199607521593007466)
  800. return BASE.format(5192765204898783881)
  801. def uptime() -> int:
  802. """
  803. Returns userbot uptime in seconds
  804. :return: Uptime in seconds
  805. """
  806. return round(time.perf_counter() - init_ts)
  807. def formatted_uptime() -> str:
  808. """
  809. Returnes formmated uptime
  810. :return: Formatted uptime
  811. """
  812. return str(timedelta(seconds=uptime()))
  813. def ascii_face() -> str:
  814. """
  815. Returnes cute ASCII-art face
  816. :return: ASCII-art face
  817. """
  818. return escape_html(
  819. random.choice(
  820. [
  821. "ヽ(๑◠ܫ◠๑)ノ",
  822. "(◕ᴥ◕ʋ)",
  823. "ᕙ(`▽´)ᕗ",
  824. "(✿◠‿◠)",
  825. "(▰˘◡˘▰)",
  826. "(˵ ͡° ͜ʖ ͡°˵)",
  827. "ʕっ•ᴥ•ʔっ",
  828. "( ͡° ᴥ ͡°)",
  829. "(๑•́ ヮ •̀๑)",
  830. "٩(^‿^)۶",
  831. "(っˆڡˆς)",
  832. "ψ(`∇´)ψ",
  833. "⊙ω⊙",
  834. "٩(^ᴗ^)۶",
  835. "(´・ω・)っ由",
  836. "( ͡~ ͜ʖ ͡°)",
  837. "✧♡(◕‿◕✿)",
  838. "โ๏௰๏ใ ื",
  839. "∩。• ᵕ •。∩ ♡",
  840. "(♡´౪`♡)",
  841. "(◍>◡<◍)⋈。✧♡",
  842. "╰(✿´⌣`✿)╯♡",
  843. "ʕ•ᴥ•ʔ",
  844. "ᶘ ◕ᴥ◕ᶅ",
  845. "▼・ᴥ・▼",
  846. "ฅ^•ﻌ•^ฅ",
  847. "(΄◞ิ౪◟ิ‵)",
  848. "٩(^ᴗ^)۶",
  849. "ᕴーᴥーᕵ",
  850. "ʕ→ᴥ←ʔ",
  851. "ʕᵕᴥᵕʔ",
  852. "ʕᵒᴥᵒʔ",
  853. "ᵔᴥᵔ",
  854. "(✿╹◡╹)",
  855. "(๑→ܫ←)",
  856. "ʕ·ᴥ· ʔ",
  857. "(ノ≧ڡ≦)",
  858. "(≖ᴗ≖✿)",
  859. "(〜^∇^ )〜",
  860. "( ノ・ェ・ )ノ",
  861. "~( ˘▾˘~)",
  862. "(〜^∇^)〜",
  863. "ヽ(^ᴗ^ヽ)",
  864. "(´・ω・`)",
  865. "₍ᐢ•ﻌ•ᐢ₎*・゚。",
  866. "(。・・)_且",
  867. "(=`ω´=)",
  868. "(*•‿•*)",
  869. "(*゚∀゚*)",
  870. "(☉⋆‿⋆☉)",
  871. "ɷ◡ɷ",
  872. "ʘ‿ʘ",
  873. "(。-ω-)ノ",
  874. "( ・ω・)ノ",
  875. "(=゚ω゚)ノ",
  876. "(・ε・`*) …",
  877. "ʕっ•ᴥ•ʔっ",
  878. "(*˘︶˘*)",
  879. ]
  880. )
  881. )
  882. def array_sum(
  883. array: typing.List[typing.List[typing.Any]], /
  884. ) -> typing.List[typing.Any]:
  885. """
  886. Performs basic sum operation on array
  887. :param array: Array to sum
  888. :return: Sum of array
  889. """
  890. result = []
  891. for item in array:
  892. result += item
  893. return result
  894. def rand(size: int, /) -> str:
  895. """
  896. Return random string of len `size`
  897. :param size: Length of string
  898. :return: Random string
  899. """
  900. return "".join(
  901. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  902. )
  903. def smart_split(
  904. text: str,
  905. entities: typing.List[FormattingEntity],
  906. length: int = 4096,
  907. split_on: ListLike = ("\n", " "),
  908. min_length: int = 1,
  909. ) -> typing.Iterator[str]:
  910. """
  911. Split the message into smaller messages.
  912. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  913. The end of each message except the last one is stripped of characters from [split_on]
  914. :param text: the plain text input
  915. :param entities: the entities
  916. :param length: the maximum length of a single message
  917. :param split_on: characters (or strings) which are preferred for a message break
  918. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  919. :return: iterator, which returns strings
  920. :example:
  921. >>> utils.smart_split(
  922. *hikkatl.extensions.html.parse(
  923. "<b>Hello, world!</b>"
  924. )
  925. )
  926. <<< ["<b>Hello, world!</b>"]
  927. """
  928. # Authored by @bsolute
  929. # https://t.me/LonamiWebs/27777
  930. encoded = text.encode("utf-16le")
  931. pending_entities = entities
  932. text_offset = 0
  933. bytes_offset = 0
  934. text_length = len(text)
  935. bytes_length = len(encoded)
  936. while text_offset < text_length:
  937. if bytes_offset + length * 2 >= bytes_length:
  938. yield parser.unparse(
  939. text[text_offset:],
  940. list(sorted(pending_entities, key=lambda x: x.offset)),
  941. )
  942. break
  943. codepoint_count = len(
  944. encoded[bytes_offset : bytes_offset + length * 2].decode(
  945. "utf-16le",
  946. errors="ignore",
  947. )
  948. )
  949. for search in split_on:
  950. search_index = text.rfind(
  951. search,
  952. text_offset + min_length,
  953. text_offset + codepoint_count,
  954. )
  955. if search_index != -1:
  956. break
  957. else:
  958. search_index = text_offset + codepoint_count
  959. split_index = grapheme.safe_split_index(text, search_index)
  960. split_offset_utf16 = (
  961. len(text[text_offset:split_index].encode("utf-16le"))
  962. ) // 2
  963. exclude = 0
  964. while (
  965. split_index + exclude < text_length
  966. and text[split_index + exclude] in split_on
  967. ):
  968. exclude += 1
  969. current_entities = []
  970. entities = pending_entities.copy()
  971. pending_entities = []
  972. for entity in entities:
  973. if (
  974. entity.offset < split_offset_utf16
  975. and entity.offset + entity.length > split_offset_utf16 + exclude
  976. ):
  977. # spans boundary
  978. current_entities.append(
  979. _copy_tl(
  980. entity,
  981. length=split_offset_utf16 - entity.offset,
  982. )
  983. )
  984. pending_entities.append(
  985. _copy_tl(
  986. entity,
  987. offset=0,
  988. length=entity.offset
  989. + entity.length
  990. - split_offset_utf16
  991. - exclude,
  992. )
  993. )
  994. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  995. # overlaps boundary
  996. current_entities.append(
  997. _copy_tl(
  998. entity,
  999. length=split_offset_utf16 - entity.offset,
  1000. )
  1001. )
  1002. elif entity.offset < split_offset_utf16:
  1003. # wholly left
  1004. current_entities.append(entity)
  1005. elif (
  1006. entity.offset + entity.length
  1007. > split_offset_utf16 + exclude
  1008. > entity.offset
  1009. ):
  1010. # overlaps right boundary
  1011. pending_entities.append(
  1012. _copy_tl(
  1013. entity,
  1014. offset=0,
  1015. length=entity.offset
  1016. + entity.length
  1017. - split_offset_utf16
  1018. - exclude,
  1019. )
  1020. )
  1021. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  1022. # wholly right
  1023. pending_entities.append(
  1024. _copy_tl(
  1025. entity,
  1026. offset=entity.offset - split_offset_utf16 - exclude,
  1027. )
  1028. )
  1029. current_text = text[text_offset:split_index]
  1030. yield parser.unparse(
  1031. current_text,
  1032. list(sorted(current_entities, key=lambda x: x.offset)),
  1033. )
  1034. text_offset = split_index + exclude
  1035. bytes_offset += len(current_text.encode("utf-16le"))
  1036. def _copy_tl(o, **kwargs):
  1037. d = o.to_dict()
  1038. del d["_"]
  1039. d.update(kwargs)
  1040. return o.__class__(**d)
  1041. def check_url(url: str) -> bool:
  1042. """
  1043. Statically checks url for validity
  1044. :param url: URL to check
  1045. :return: True if valid, False otherwise
  1046. """
  1047. try:
  1048. return bool(urlparse(url).netloc)
  1049. except Exception:
  1050. return False
  1051. def get_git_hash() -> typing.Union[str, bool]:
  1052. """
  1053. Get current Hikka git hash
  1054. :return: Git commit hash
  1055. """
  1056. try:
  1057. return git.Repo().head.commit.hexsha
  1058. except Exception:
  1059. return False
  1060. def get_commit_url() -> str:
  1061. """
  1062. Get current Hikka git commit url
  1063. :return: Git commit url
  1064. """
  1065. try:
  1066. hash_ = get_git_hash()
  1067. return (
  1068. f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
  1069. )
  1070. except Exception:
  1071. return "Unknown"
  1072. def is_serializable(x: typing.Any, /) -> bool:
  1073. """
  1074. Checks if object is JSON-serializable
  1075. :param x: Object to check
  1076. :return: True if object is JSON-serializable, False otherwise
  1077. """
  1078. try:
  1079. json.dumps(x)
  1080. return True
  1081. except Exception:
  1082. return False
  1083. def get_lang_flag(countrycode: str) -> str:
  1084. """
  1085. Gets an emoji of specified countrycode
  1086. :param countrycode: 2-letter countrycode
  1087. :return: Emoji flag
  1088. """
  1089. if (
  1090. len(
  1091. code := [
  1092. c
  1093. for c in countrycode.lower()
  1094. if c in string.ascii_letters + string.digits
  1095. ]
  1096. )
  1097. == 2
  1098. ):
  1099. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  1100. return countrycode
  1101. def get_entity_url(
  1102. entity: typing.Union[User, Channel],
  1103. openmessage: bool = False,
  1104. ) -> str:
  1105. """
  1106. Get link to object, if available
  1107. :param entity: Entity to get url of
  1108. :param openmessage: Use tg://openmessage link for users
  1109. :return: Link to object or empty string
  1110. """
  1111. return (
  1112. (
  1113. f"tg://openmessage?id={entity.id}"
  1114. if openmessage
  1115. else f"tg://user?id={entity.id}"
  1116. )
  1117. if isinstance(entity, User)
  1118. else (
  1119. f"tg://resolve?domain={entity.username}"
  1120. if getattr(entity, "username", None)
  1121. else ""
  1122. )
  1123. )
  1124. async def get_message_link(
  1125. message: Message,
  1126. chat: typing.Optional[typing.Union[Chat, Channel]] = None,
  1127. ) -> str:
  1128. """
  1129. Get link to message
  1130. :param message: Message to get link of
  1131. :param chat: Chat, where message was sent
  1132. :return: Link to message
  1133. """
  1134. if message.is_private:
  1135. return (
  1136. f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
  1137. )
  1138. if not chat and not (chat := message.chat):
  1139. chat = await message.get_chat()
  1140. topic_affix = (
  1141. f"?topic={message.reply_to.reply_to_msg_id}"
  1142. if getattr(message.reply_to, "forum_topic", False)
  1143. else ""
  1144. )
  1145. return (
  1146. f"https://t.me/{chat.username}/{message.id}{topic_affix}"
  1147. if getattr(chat, "username", False)
  1148. else f"https://t.me/c/{chat.id}/{message.id}{topic_affix}"
  1149. )
  1150. def remove_html(text: str, escape: bool = False, keep_emojis: bool = False) -> str:
  1151. """
  1152. Removes HTML tags from text
  1153. :param text: Text to remove HTML from
  1154. :param escape: Escape HTML
  1155. :param keep_emojis: Keep custom emojis
  1156. :return: Text without HTML
  1157. """
  1158. return (escape_html if escape else str)(
  1159. re.sub(
  1160. (
  1161. r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>)"
  1162. if keep_emojis
  1163. else r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)"
  1164. ),
  1165. "",
  1166. text,
  1167. )
  1168. )
  1169. def get_kwargs() -> typing.Dict[str, typing.Any]:
  1170. """
  1171. Get kwargs of function, in which is called
  1172. :return: kwargs
  1173. """
  1174. # https://stackoverflow.com/a/65927265/19170642
  1175. keys, _, _, values = inspect.getargvalues(inspect.currentframe().f_back)
  1176. return {key: values[key] for key in keys if key != "self"}
  1177. def mime_type(message: Message) -> str:
  1178. """
  1179. Get mime type of document in message
  1180. :param message: Message with document
  1181. :return: Mime type or empty string if not present
  1182. """
  1183. return (
  1184. ""
  1185. if not isinstance(message, Message) or not getattr(message, "media", False)
  1186. else getattr(getattr(message, "media", False), "mime_type", False) or ""
  1187. )
  1188. def find_caller(
  1189. stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
  1190. ) -> typing.Any:
  1191. """
  1192. Attempts to find command in stack
  1193. :param stack: Stack to search in
  1194. :return: Command-caller or None
  1195. """
  1196. caller = next(
  1197. (
  1198. frame_info
  1199. for frame_info in stack or inspect.stack()
  1200. if hasattr(frame_info, "function")
  1201. and any(
  1202. inspect.isclass(cls_)
  1203. and issubclass(cls_, Module)
  1204. and cls_ is not Module
  1205. for cls_ in frame_info.frame.f_globals.values()
  1206. )
  1207. ),
  1208. None,
  1209. )
  1210. if not caller:
  1211. return next(
  1212. (
  1213. frame_info.frame.f_locals["func"]
  1214. for frame_info in stack or inspect.stack()
  1215. if hasattr(frame_info, "function")
  1216. and frame_info.function == "future_dispatcher"
  1217. and (
  1218. "CommandDispatcher"
  1219. in getattr(getattr(frame_info, "frame", None), "f_globals", {})
  1220. )
  1221. ),
  1222. None,
  1223. )
  1224. return next(
  1225. (
  1226. getattr(cls_, caller.function, None)
  1227. for cls_ in caller.frame.f_globals.values()
  1228. if inspect.isclass(cls_) and issubclass(cls_, Module)
  1229. ),
  1230. None,
  1231. )
  1232. def validate_html(html: str) -> str:
  1233. """
  1234. Removes broken tags from html
  1235. :param html: HTML to validate
  1236. :return: Valid HTML
  1237. """
  1238. text, entities = hikkatl.extensions.html.parse(html)
  1239. return hikkatl.extensions.html.unparse(escape_html(text), entities)
  1240. def iter_attrs(obj: typing.Any, /) -> typing.List[typing.Tuple[str, typing.Any]]:
  1241. """
  1242. Returns list of attributes of object
  1243. :param obj: Object to iterate over
  1244. :return: List of attributes and their values
  1245. """
  1246. return ((attr, getattr(obj, attr)) for attr in dir(obj))
  1247. def atexit(
  1248. func: typing.Callable,
  1249. use_signal: typing.Optional[int] = None,
  1250. *args,
  1251. **kwargs,
  1252. ) -> None:
  1253. """
  1254. Calls function on exit
  1255. :param func: Function to call
  1256. :param use_signal: If passed, `signal` will be used instead of `atexit`
  1257. :param args: Arguments to pass to function
  1258. :param kwargs: Keyword arguments to pass to function
  1259. :return: None
  1260. """
  1261. if use_signal:
  1262. signal.signal(use_signal, lambda *_: func(*args, **kwargs))
  1263. return
  1264. _atexit.register(functools.partial(func, *args, **kwargs))
  1265. def get_topic(message: Message) -> typing.Optional[int]:
  1266. """
  1267. Get topic id of message
  1268. :param message: Message to get topic of
  1269. :return: int or None if not present
  1270. """
  1271. return (
  1272. (message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id)
  1273. if (
  1274. isinstance(message, Message)
  1275. and message.reply_to
  1276. and message.reply_to.forum_topic
  1277. )
  1278. else (
  1279. message.form["top_msg_id"]
  1280. if isinstance(message, (InlineCall, InlineMessage))
  1281. else None
  1282. )
  1283. )
  1284. def get_ram_usage() -> float:
  1285. """Returns current process tree memory usage in MB"""
  1286. try:
  1287. import psutil
  1288. current_process = psutil.Process(os.getpid())
  1289. mem = current_process.memory_info()[0] / 2.0**20
  1290. for child in current_process.children(recursive=True):
  1291. mem += child.memory_info()[0] / 2.0**20
  1292. return round(mem, 1)
  1293. except Exception:
  1294. return 0
  1295. def get_cpu_usage() -> float:
  1296. """Returns current process tree CPU usage in %"""
  1297. try:
  1298. import psutil
  1299. current_process = psutil.Process(os.getpid())
  1300. cpu = current_process.cpu_percent()
  1301. for child in current_process.children(recursive=True):
  1302. cpu += child.cpu_percent()
  1303. return round(cpu, 1)
  1304. except Exception:
  1305. return 0
  1306. init_ts = time.perf_counter()
  1307. # GeekTG Compatibility
  1308. def get_git_info() -> typing.Tuple[str, str]:
  1309. """
  1310. Get git info
  1311. :return: Git info
  1312. """
  1313. hash_ = get_git_hash()
  1314. return (
  1315. hash_,
  1316. f"https://github.com/hikariatama/Hikka/commit/{hash_}" if hash_ else "",
  1317. )
  1318. def get_version_raw() -> str:
  1319. """
  1320. Get the version of the userbot
  1321. :return: Version in format %s.%s.%s
  1322. """
  1323. from . import version
  1324. return ".".join(map(str, list(version.__version__)))
  1325. get_platform_name = get_named_platform