utils.py 43 KB


  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2023
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import asyncio
  20. import atexit as _atexit
  21. import contextlib
  22. import functools
  23. import inspect
  24. import io
  25. import json
  26. import logging
  27. import os
  28. import random
  29. import re
  30. import shlex
  31. import signal
  32. import string
  33. import time
  34. import typing
  35. from datetime import timedelta
  36. from urllib.parse import urlparse
  37. import git
  38. import grapheme
  39. import hikkatl
  40. import requests
  41. from aiogram.types import Message as AiogramMessage
  42. from hikkatl import hints
  43. from hikkatl.tl.custom.message import Message
  44. from hikkatl.tl.functions.account import UpdateNotifySettingsRequest
  45. from hikkatl.tl.functions.channels import (
  46. CreateChannelRequest,
  47. EditAdminRequest,
  48. EditPhotoRequest,
  49. InviteToChannelRequest,
  50. )
  51. from hikkatl.tl.functions.messages import (
  52. GetDialogFiltersRequest,
  53. SetHistoryTTLRequest,
  54. UpdateDialogFilterRequest,
  55. )
  56. from hikkatl.tl.types import (
  57. Channel,
  58. Chat,
  59. ChatAdminRights,
  60. InputDocument,
  61. InputPeerNotifySettings,
  62. MessageEntityBankCard,
  63. MessageEntityBlockquote,
  64. MessageEntityBold,
  65. MessageEntityBotCommand,
  66. MessageEntityCashtag,
  67. MessageEntityCode,
  68. MessageEntityEmail,
  69. MessageEntityHashtag,
  70. MessageEntityItalic,
  71. MessageEntityMention,
  72. MessageEntityMentionName,
  73. MessageEntityPhone,
  74. MessageEntityPre,
  75. MessageEntitySpoiler,
  76. MessageEntityStrike,
  77. MessageEntityTextUrl,
  78. MessageEntityUnderline,
  79. MessageEntityUnknown,
  80. MessageEntityUrl,
  81. MessageMediaWebPage,
  82. PeerChannel,
  83. PeerChat,
  84. PeerUser,
  85. UpdateNewChannelMessage,
  86. User,
  87. )
  88. from ._internal import fw_protect
  89. from .inline.types import BotInlineCall, InlineCall, InlineMessage
  90. from .tl_cache import CustomTelegramClient
  91. from .types import HikkaReplyMarkup, ListLike, Module
  92. FormattingEntity = typing.Union[
  93. MessageEntityUnknown,
  94. MessageEntityMention,
  95. MessageEntityHashtag,
  96. MessageEntityBotCommand,
  97. MessageEntityUrl,
  98. MessageEntityEmail,
  99. MessageEntityBold,
  100. MessageEntityItalic,
  101. MessageEntityCode,
  102. MessageEntityPre,
  103. MessageEntityTextUrl,
  104. MessageEntityMentionName,
  105. MessageEntityPhone,
  106. MessageEntityCashtag,
  107. MessageEntityUnderline,
  108. MessageEntityStrike,
  109. MessageEntityBlockquote,
  110. MessageEntityBankCard,
  111. MessageEntitySpoiler,
  112. ]
  113. emoji_pattern = re.compile(
  114. "["
  115. "\U0001f600-\U0001f64f" # emoticons
  116. "\U0001f300-\U0001f5ff" # symbols & pictographs
  117. "\U0001f680-\U0001f6ff" # transport & map symbols
  118. "\U0001f1e0-\U0001f1ff" # flags (iOS)
  119. "]+",
  120. flags=re.UNICODE,
  121. )
  122. parser = hikkatl.utils.sanitize_parse_mode("html")
  123. logger = logging.getLogger(__name__)
  124. def get_args(message: typing.Union[Message, str]) -> typing.List[str]:
  125. """
  126. Get arguments from message
  127. :param message: Message or string to get arguments from
  128. :return: List of arguments
  129. """
  130. if not (message := getattr(message, "message", message)):
  131. return False
  132. if len(message := message.split(maxsplit=1)) <= 1:
  133. return []
  134. message = message[1]
  135. try:
  136. split = shlex.split(message)
  137. except ValueError:
  138. return message # Cannot split, let's assume that it's just one long message
  139. return list(filter(lambda x: len(x) > 0, split))
  140. def get_args_raw(message: typing.Union[Message, str]) -> str:
  141. """
  142. Get the parameters to the command as a raw string (not split)
  143. :param message: Message or string to get arguments from
  144. :return: Raw string of arguments
  145. """
  146. if not (message := getattr(message, "message", message)):
  147. return False
  148. return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
  149. def get_args_html(message: Message) -> str:
  150. """
  151. Get the parameters to the command as string with HTML (not split)
  152. :param message: Message to get arguments from
  153. :return: String with HTML arguments
  154. """
  155. prefix = message.client.loader.get_prefix()
  156. if not (message := message.text):
  157. return False
  158. if prefix not in message:
  159. return message
  160. raw_text, entities = parser.parse(message)
  161. raw_text = parser._add_surrogate(raw_text)
  162. try:
  163. command = raw_text[
  164. raw_text.index(prefix) : raw_text.index(" ", raw_text.index(prefix) + 1)
  165. ]
  166. except ValueError:
  167. return ""
  168. command_len = len(command) + 1
  169. return parser.unparse(
  170. parser._del_surrogate(raw_text[command_len:]),
  171. relocate_entities(entities, -command_len, raw_text[command_len:]),
  172. )
  173. def get_args_split_by(
  174. message: typing.Union[Message, str],
  175. separator: str,
  176. ) -> typing.List[str]:
  177. """
  178. Split args with a specific separator
  179. :param message: Message or string to get arguments from
  180. :param separator: Separator to split by
  181. :return: List of arguments
  182. """
  183. return [
  184. section.strip() for section in get_args_raw(message).split(separator) if section
  185. ]
  186. def get_chat_id(message: typing.Union[Message, AiogramMessage]) -> int:
  187. """
  188. Get the chat ID, but without -100 if its a channel
  189. :param message: Message to get chat ID from
  190. :return: Chat ID
  191. """
  192. return hikkatl.utils.resolve_id(
  193. getattr(message, "chat_id", None)
  194. or getattr(getattr(message, "chat", None), "id", None)
  195. )[0]
  196. def get_entity_id(entity: hints.Entity) -> int:
  197. """
  198. Get entity ID
  199. :param entity: Entity to get ID from
  200. :return: Entity ID
  201. """
  202. return hikkatl.utils.get_peer_id(entity)
  203. def escape_html(text: str, /) -> str: # sourcery skip
  204. """
  205. Pass all untrusted/potentially corrupt input here
  206. :param text: Text to escape
  207. :return: Escaped text
  208. """
  209. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  210. def escape_quotes(text: str, /) -> str:
  211. """
  212. Escape quotes to html quotes
  213. :param text: Text to escape
  214. :return: Escaped text
  215. """
  216. return escape_html(text).replace('"', "&quot;")
  217. def get_base_dir() -> str:
  218. """
  219. Get directory of this file
  220. :return: Directory of this file
  221. """
  222. return get_dir(__file__)
  223. def get_dir(mod: str) -> str:
  224. """
  225. Get directory of given module
  226. :param mod: Module's `__file__` to get directory of
  227. :return: Directory of given module
  228. """
  229. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  230. async def get_user(message: Message) -> typing.Optional[User]:
  231. """
  232. Get user who sent message, searching if not found easily
  233. :param message: Message to get user from
  234. :return: User who sent message
  235. """
  236. try:
  237. return await message.get_sender()
  238. except ValueError: # Not in database. Lets go looking for them.
  239. logger.debug("User not in session cache. Searching...")
  240. if isinstance(message.peer_id, PeerUser):
  241. await message.client.get_dialogs()
  242. return await message.get_sender()
  243. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  244. async for user in message.client.iter_participants(
  245. message.peer_id,
  246. aggressive=True,
  247. ):
  248. if user.id == message.sender_id:
  249. return user
  250. logger.error("User isn't in the group where they sent the message")
  251. return None
  252. logger.error("`peer_id` is not a user, chat or channel")
  253. return None
  254. def run_sync(func, *args, **kwargs):
  255. """
  256. Run a non-async function in a new thread and return an awaitable
  257. :param func: Sync-only function to execute
  258. :return: Awaitable coroutine
  259. """
  260. return asyncio.get_event_loop().run_in_executor(
  261. None,
  262. functools.partial(func, *args, **kwargs),
  263. )
  264. def run_async(loop: asyncio.AbstractEventLoop, coro: typing.Awaitable) -> typing.Any:
  265. """
  266. Run an async function as a non-async function, blocking till it's done
  267. :param loop: Event loop to run the coroutine in
  268. :param coro: Coroutine to run
  269. :return: Result of the coroutine
  270. """
  271. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  272. def censor(
  273. obj: typing.Any,
  274. to_censor: typing.Optional[typing.Iterable[str]] = None,
  275. replace_with: str = "redacted_{count}_chars",
  276. ):
  277. """
  278. May modify the original object, but don't rely on it
  279. :param obj: Object to censor, preferrably telethon
  280. :param to_censor: Iterable of strings to censor
  281. :param replace_with: String to replace with, {count} will be replaced with the number of characters
  282. :return: Censored object
  283. """
  284. if to_censor is None:
  285. to_censor = ["phone"]
  286. for k, v in vars(obj).items():
  287. if k in to_censor:
  288. setattr(obj, k, replace_with.format(count=len(v)))
  289. elif k[0] != "_" and hasattr(v, "__dict__"):
  290. setattr(obj, k, censor(v, to_censor, replace_with))
  291. return obj
  292. def relocate_entities(
  293. entities: typing.List[FormattingEntity],
  294. offset: int,
  295. text: typing.Optional[str] = None,
  296. ) -> typing.List[FormattingEntity]:
  297. """
  298. Move all entities by offset (truncating at text)
  299. :param entities: List of entities
  300. :param offset: Offset to move by
  301. :param text: Text to truncate at
  302. :return: List of entities
  303. """
  304. length = len(text) if text is not None else 0
  305. for ent in entities.copy() if entities else ():
  306. ent.offset += offset
  307. if ent.offset < 0:
  308. ent.length += ent.offset
  309. ent.offset = 0
  310. if text is not None and ent.offset + ent.length > length:
  311. ent.length = length - ent.offset
  312. if ent.length <= 0:
  313. entities.remove(ent)
  314. return entities
  315. async def answer_file(
  316. message: typing.Union[Message, InlineCall, InlineMessage],
  317. file: typing.Union[str, bytes, io.IOBase, InputDocument],
  318. caption: typing.Optional[str] = None,
  319. **kwargs,
  320. ):
  321. """
  322. Use this to answer a message with a document
  323. :param message: Message to answer
  324. :param file: File to send - url, path or bytes
  325. :param caption: Caption to send
  326. :param kwargs: Extra kwargs to pass to `send_file`
  327. :return: Sent message
  328. :example:
  329. >>> await utils.answer_file(message, "test.txt")
  330. >>> await utils.answer_file(
  331. message,
  332. "https://mods.hikariatama.ru/badges/artai.jpg",
  333. "This is the cool module, check it out!",
  334. )
  335. """
  336. if isinstance(message, (InlineCall, InlineMessage)):
  337. message = message.form["caller"]
  338. if topic := get_topic(message):
  339. kwargs.setdefault("reply_to", topic)
  340. try:
  341. response = await message.client.send_file(
  342. message.peer_id,
  343. file,
  344. caption=caption,
  345. **kwargs,
  346. )
  347. except Exception:
  348. if caption:
  349. logger.warning(
  350. "Failed to send file, sending plain text instead", exc_info=True
  351. )
  352. return await answer(message, caption, **kwargs)
  353. raise
  354. with contextlib.suppress(Exception):
  355. await message.delete()
  356. return response
  357. async def answer(
  358. message: typing.Union[Message, InlineCall, InlineMessage],
  359. response: str,
  360. *,
  361. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  362. **kwargs,
  363. ) -> typing.Union[InlineCall, InlineMessage, Message]:
  364. """
  365. Use this to give the response to a command
  366. :param message: Message to answer to. Can be a tl message or hikka inline object
  367. :param response: Response to send
  368. :param reply_markup: Reply markup to send. If specified, inline form will be used
  369. :return: Message or inline object
  370. :example:
  371. >>> await utils.answer(message, "Hello world!")
  372. >>> await utils.answer(
  373. message,
  374. "https://some-url.com/photo.jpg",
  375. caption="Hello, this is your photo!",
  376. asfile=True,
  377. )
  378. >>> await utils.answer(
  379. message,
  380. "Hello world!",
  381. reply_markup={"text": "Hello!", "data": "world"},
  382. silent=True,
  383. disable_security=True,
  384. )
  385. """
  386. # Compatibility with FTG\GeekTG
  387. if isinstance(message, list) and message:
  388. message = message[0]
  389. if reply_markup is not None:
  390. if not isinstance(reply_markup, (list, dict)):
  391. raise ValueError("reply_markup must be a list or dict")
  392. if reply_markup:
  393. kwargs.pop("message", None)
  394. if isinstance(message, (InlineMessage, InlineCall, BotInlineCall)):
  395. await message.edit(response, reply_markup, **kwargs)
  396. return
  397. reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
  398. result = await message.client.loader.inline.form(
  399. response,
  400. message=message if message.out else get_chat_id(message),
  401. reply_markup=reply_markup,
  402. **kwargs,
  403. )
  404. return result
  405. if isinstance(message, (InlineMessage, InlineCall, BotInlineCall)):
  406. await message.edit(response)
  407. return message
  408. kwargs.setdefault("link_preview", False)
  409. if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
  410. kwargs.setdefault(
  411. "reply_to",
  412. getattr(message, "reply_to_msg_id", None),
  413. )
  414. elif "reply_to" in kwargs:
  415. kwargs.pop("reply_to")
  416. parse_mode = hikkatl.utils.sanitize_parse_mode(
  417. kwargs.pop(
  418. "parse_mode",
  419. message.client.parse_mode,
  420. )
  421. )
  422. if isinstance(response, str) and not kwargs.pop("asfile", False):
  423. text, entities = parse_mode.parse(response)
  424. if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
  425. try:
  426. if not message.client.loader.inline.init_complete:
  427. raise
  428. strings = list(smart_split(text, entities, 4096))
  429. if len(strings) > 10:
  430. raise
  431. list_ = await message.client.loader.inline.list(
  432. message=message,
  433. strings=strings,
  434. )
  435. if not list_:
  436. raise
  437. return list_
  438. except Exception:
  439. file = io.BytesIO(text.encode("utf-8"))
  440. file.name = "command_result.txt"
  441. result = await message.client.send_file(
  442. message.peer_id,
  443. file,
  444. caption=message.client.loader.lookup("translations").strings(
  445. "too_long"
  446. ),
  447. reply_to=kwargs.get("reply_to") or get_topic(message),
  448. )
  449. if message.out:
  450. await message.delete()
  451. return result
  452. result = await (message.edit if edit else message.respond)(
  453. text,
  454. parse_mode=lambda t: (t, entities),
  455. **kwargs,
  456. )
  457. elif isinstance(response, Message):
  458. if message.media is None and (
  459. response.media is None or isinstance(response.media, MessageMediaWebPage)
  460. ):
  461. result = await message.edit(
  462. response.message,
  463. parse_mode=lambda t: (t, response.entities or []),
  464. link_preview=isinstance(response.media, MessageMediaWebPage),
  465. )
  466. else:
  467. result = await message.respond(response, **kwargs)
  468. else:
  469. if isinstance(response, bytes):
  470. response = io.BytesIO(response)
  471. elif isinstance(response, str):
  472. response = io.BytesIO(response.encode("utf-8"))
  473. if name := kwargs.pop("filename", None):
  474. response.name = name
  475. if message.media is not None and edit:
  476. await message.edit(file=response, **kwargs)
  477. else:
  478. kwargs.setdefault(
  479. "reply_to",
  480. getattr(message, "reply_to_msg_id", get_topic(message)),
  481. )
  482. result = await message.client.send_file(message.peer_id, response, **kwargs)
  483. if message.out:
  484. await message.delete()
  485. return result
  486. async def get_target(message: Message, arg_no: int = 0) -> typing.Optional[int]:
  487. """
  488. Get target from message
  489. :param message: Message to get target from
  490. :param arg_no: Argument number to get target from
  491. :return: Target
  492. """
  493. if any(
  494. isinstance(entity, MessageEntityMentionName)
  495. for entity in (message.entities or [])
  496. ):
  497. e = sorted(
  498. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  499. key=lambda x: x.offset,
  500. )[0]
  501. return e.user_id
  502. if len(get_args(message)) > arg_no:
  503. user = get_args(message)[arg_no]
  504. elif message.is_reply:
  505. return (await message.get_reply_message()).sender_id
  506. elif hasattr(message.peer_id, "user_id"):
  507. user = message.peer_id.user_id
  508. else:
  509. return None
  510. try:
  511. entity = await message.client.get_entity(user)
  512. except ValueError:
  513. return None
  514. else:
  515. if isinstance(entity, User):
  516. return entity.id
  517. def merge(a: dict, b: dict, /) -> dict:
  518. """
  519. Merge with replace dictionary a to dictionary b
  520. :param a: Dictionary to merge
  521. :param b: Dictionary to merge to
  522. :return: Merged dictionary
  523. """
  524. for key in a:
  525. if key in b:
  526. if isinstance(a[key], dict) and isinstance(b[key], dict):
  527. b[key] = merge(a[key], b[key])
  528. elif isinstance(a[key], list) and isinstance(b[key], list):
  529. b[key] = list(set(b[key] + a[key]))
  530. else:
  531. b[key] = a[key]
  532. b[key] = a[key]
  533. return b
  534. async def set_avatar(
  535. client: CustomTelegramClient,
  536. peer: hints.Entity,
  537. avatar: str,
  538. ) -> bool:
  539. """
  540. Sets an entity avatar
  541. :param client: Client to use
  542. :param peer: Peer to set avatar to
  543. :param avatar: Avatar to set
  544. :return: True if avatar was set, False otherwise
  545. """
  546. if isinstance(avatar, str) and check_url(avatar):
  547. f = (
  548. await run_sync(
  549. requests.get,
  550. avatar,
  551. )
  552. ).content
  553. elif isinstance(avatar, bytes):
  554. f = avatar
  555. else:
  556. return False
  557. await fw_protect()
  558. res = await client(
  559. EditPhotoRequest(
  560. channel=peer,
  561. photo=await client.upload_file(f, file_name="photo.png"),
  562. )
  563. )
  564. await fw_protect()
  565. try:
  566. await client.delete_messages(
  567. peer,
  568. message_ids=[
  569. next(
  570. update
  571. for update in res.updates
  572. if isinstance(update, UpdateNewChannelMessage)
  573. ).message.id
  574. ],
  575. )
  576. except Exception:
  577. pass
  578. return True
  579. async def invite_inline_bot(
  580. client: CustomTelegramClient,
  581. peer: hints.EntityLike,
  582. ) -> None:
  583. """
  584. Invites inline bot to a chat
  585. :param client: Client to use
  586. :param peer: Peer to invite bot to
  587. :return: None
  588. :raise RuntimeError: If error occurred while inviting bot
  589. """
  590. try:
  591. await client(InviteToChannelRequest(peer, [client.loader.inline.bot_username]))
  592. except Exception as e:
  593. raise RuntimeError(
  594. "Can't invite inline bot to old asset chat, which is required by module"
  595. ) from e
  596. with contextlib.suppress(Exception):
  597. await client(
  598. EditAdminRequest(
  599. channel=peer,
  600. user_id=client.loader.inline.bot_username,
  601. admin_rights=ChatAdminRights(ban_users=True),
  602. rank="Hikka",
  603. )
  604. )
  605. async def asset_channel(
  606. client: CustomTelegramClient,
  607. title: str,
  608. description: str,
  609. *,
  610. channel: bool = False,
  611. silent: bool = False,
  612. archive: bool = False,
  613. invite_bot: bool = False,
  614. avatar: typing.Optional[str] = None,
  615. ttl: typing.Optional[int] = None,
  616. forum: bool = False,
  617. _folder: typing.Optional[str] = None,
  618. ) -> typing.Tuple[Channel, bool]:
  619. """
  620. Create new channel (if needed) and return its entity
  621. :param client: Telegram client to create channel by
  622. :param title: Channel title
  623. :param description: Description
  624. :param channel: Whether to create a channel or supergroup
  625. :param silent: Automatically mute channel
  626. :param archive: Automatically archive channel
  627. :param invite_bot: Add inline bot and assure it's in chat
  628. :param avatar: Url to an avatar to set as pfp of created peer
  629. :param ttl: Time to live for messages in channel
  630. :param forum: Whether to create a forum channel
  631. :return: Peer and bool: is channel new or pre-existent
  632. """
  633. if not hasattr(client, "_channels_cache"):
  634. client._channels_cache = {}
  635. if (
  636. title in client._channels_cache
  637. and client._channels_cache[title]["exp"] > time.time()
  638. ):
  639. return client._channels_cache[title]["peer"], False
  640. async for d in client.iter_dialogs():
  641. if d.title == title:
  642. client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
  643. if invite_bot:
  644. if all(
  645. participant.id != client.loader.inline.bot_id
  646. for participant in (
  647. await client.get_participants(d.entity, limit=100)
  648. )
  649. ):
  650. await fw_protect()
  651. await invite_inline_bot(client, d.entity)
  652. return d.entity, False
  653. await fw_protect()
  654. peer = (
  655. await client(
  656. CreateChannelRequest(
  657. title,
  658. description,
  659. megagroup=not channel,
  660. forum=forum,
  661. )
  662. )
  663. ).chats[0]
  664. if invite_bot:
  665. await fw_protect()
  666. await invite_inline_bot(client, peer)
  667. if silent:
  668. await fw_protect()
  669. await dnd(client, peer, archive)
  670. elif archive:
  671. await fw_protect()
  672. await client.edit_folder(peer, 1)
  673. if avatar:
  674. await fw_protect()
  675. await set_avatar(client, peer, avatar)
  676. if ttl:
  677. await fw_protect()
  678. await client(SetHistoryTTLRequest(peer=peer, period=ttl))
  679. if _folder:
  680. if _folder != "hikka":
  681. raise NotImplementedError
  682. folders = await client(GetDialogFiltersRequest())
  683. try:
  684. folder = next(folder for folder in folders if folder.title == "hikka")
  685. except Exception:
  686. folder = None
  687. if folder is not None and not any(
  688. peer.id == getattr(folder_peer, "channel_id", None)
  689. for folder_peer in folder.include_peers
  690. ):
  691. folder.include_peers += [await client.get_input_entity(peer)]
  692. await client(
  693. UpdateDialogFilterRequest(
  694. folder.id,
  695. folder,
  696. )
  697. )
  698. client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
  699. return peer, True
  700. async def dnd(
  701. client: CustomTelegramClient,
  702. peer: hints.Entity,
  703. archive: bool = True,
  704. ) -> bool:
  705. """
  706. Mutes and optionally archives peer
  707. :param peer: Anything entity-link
  708. :param archive: Archive peer, or just mute?
  709. :return: `True` on success, otherwise `False`
  710. """
  711. try:
  712. await client(
  713. UpdateNotifySettingsRequest(
  714. peer=peer,
  715. settings=InputPeerNotifySettings(
  716. show_previews=False,
  717. silent=True,
  718. mute_until=2**31 - 1,
  719. ),
  720. )
  721. )
  722. if archive:
  723. await fw_protect()
  724. await client.edit_folder(peer, 1)
  725. except Exception:
  726. logger.exception("utils.dnd error")
  727. return False
  728. return True
  729. def get_link(user: typing.Union[User, Channel], /) -> str:
  730. """
  731. Get telegram permalink to entity
  732. :param user: User or channel
  733. :return: Link to entity
  734. """
  735. return (
  736. f"tg://user?id={user.id}"
  737. if isinstance(user, User)
  738. else (
  739. f"tg://resolve?domain={user.username}"
  740. if getattr(user, "username", None)
  741. else ""
  742. )
  743. )
  744. def chunks(_list: ListLike, n: int, /) -> typing.List[typing.List[typing.Any]]:
  745. """
  746. Split provided `_list` into chunks of `n`
  747. :param _list: List to split
  748. :param n: Chunk size
  749. :return: List of chunks
  750. """
  751. return [_list[i : i + n] for i in range(0, len(_list), n)]
  752. def get_named_platform() -> str:
  753. """
  754. Returns formatted platform name
  755. :return: Platform name
  756. """
  757. from . import main
  758. with contextlib.suppress(Exception):
  759. if os.path.isfile("/proc/device-tree/model"):
  760. with open("/proc/device-tree/model") as f:
  761. model = f.read()
  762. if "Orange" in model:
  763. return f"🍊 {model}"
  764. return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
  765. if main.IS_WSL:
  766. return "🍀 WSL"
  767. if main.IS_GOORM:
  768. return "🦾 GoormIDE"
  769. if main.IS_RAILWAY:
  770. return "🚂 Railway"
  771. if main.IS_HIKKAHOST:
  772. return "🌼 HikkaHost"
  773. if main.IS_DOCKER:
  774. return "🐳 Docker"
  775. if main.IS_TERMUX:
  776. return "🕶 Termux"
  777. if main.IS_CODESPACES:
  778. return "🐈‍⬛ Codespaces"
  779. return f"✌️ lavHost {os.environ['LAVHOST']}" if main.IS_LAVHOST else "📻 VDS"
  780. def get_platform_emoji() -> str:
  781. """
  782. Returns custom emoji for current platform
  783. :return: Emoji entity in string
  784. """
  785. from . import main
  786. BASE = "".join((
  787. "<emoji document_id={}>🌘</emoji>",
  788. "<emoji document_id=5195311729663286630>🌘</emoji>",
  789. "<emoji document_id=5195045669324201904>🌘</emoji>",
  790. ))
  791. if main.IS_DOCKER:
  792. return BASE.format(5298554256603752468)
  793. if main.IS_LAVHOST:
  794. return BASE.format(5301078610747074753)
  795. if main.IS_GOORM:
  796. return BASE.format(5298947740032573902)
  797. if main.IS_CODESPACES:
  798. return BASE.format(5194976881127989720)
  799. if main.IS_TERMUX:
  800. return BASE.format(5193051778001673828)
  801. if main.IS_RAILWAY:
  802. return BASE.format(5199607521593007466)
  803. if main.IS_HIKKAHOST:
  804. return BASE.format(5370731117588523522)
  805. return BASE.format(5192765204898783881)
  806. def uptime() -> int:
  807. """
  808. Returns userbot uptime in seconds
  809. :return: Uptime in seconds
  810. """
  811. return round(time.perf_counter() - init_ts)
  812. def formatted_uptime() -> str:
  813. """
  814. Returnes formmated uptime
  815. :return: Formatted uptime
  816. """
  817. return str(timedelta(seconds=uptime()))
  818. def ascii_face() -> str:
  819. """
  820. Returnes cute ASCII-art face
  821. :return: ASCII-art face
  822. """
  823. return escape_html(
  824. random.choice([
  825. "ヽ(๑◠ܫ◠๑)ノ",
  826. "(◕ᴥ◕ʋ)",
  827. "ᕙ(`▽´)ᕗ",
  828. "(✿◠‿◠)",
  829. "(▰˘◡˘▰)",
  830. "(˵ ͡° ͜ʖ ͡°˵)",
  831. "ʕっ•ᴥ•ʔっ",
  832. "( ͡° ᴥ ͡°)",
  833. "(๑•́ ヮ •̀๑)",
  834. "٩(^‿^)۶",
  835. "(っˆڡˆς)",
  836. "ψ(`∇´)ψ",
  837. "⊙ω⊙",
  838. "٩(^ᴗ^)۶",
  839. "(´・ω・)っ由",
  840. "( ͡~ ͜ʖ ͡°)",
  841. "✧♡(◕‿◕✿)",
  842. "โ๏௰๏ใ ื",
  843. "∩。• ᵕ •。∩ ♡",
  844. "(♡´౪`♡)",
  845. "(◍>◡<◍)⋈。✧♡",
  846. "╰(✿´⌣`✿)╯♡",
  847. "ʕ•ᴥ•ʔ",
  848. "ᶘ ◕ᴥ◕ᶅ",
  849. "▼・ᴥ・▼",
  850. "ฅ^•ﻌ•^ฅ",
  851. "(΄◞ิ౪◟ิ‵)",
  852. "٩(^ᴗ^)۶",
  853. "ᕴーᴥーᕵ",
  854. "ʕ→ᴥ←ʔ",
  855. "ʕᵕᴥᵕʔ",
  856. "ʕᵒᴥᵒʔ",
  857. "ᵔᴥᵔ",
  858. "(✿╹◡╹)",
  859. "(๑→ܫ←)",
  860. "ʕ·ᴥ· ʔ",
  861. "(ノ≧ڡ≦)",
  862. "(≖ᴗ≖✿)",
  863. "(〜^∇^ )〜",
  864. "( ノ・ェ・ )ノ",
  865. "~( ˘▾˘~)",
  866. "(〜^∇^)〜",
  867. "ヽ(^ᴗ^ヽ)",
  868. "(´・ω・`)",
  869. "₍ᐢ•ﻌ•ᐢ₎*・゚。",
  870. "(。・・)_且",
  871. "(=`ω´=)",
  872. "(*•‿•*)",
  873. "(*゚∀゚*)",
  874. "(☉⋆‿⋆☉)",
  875. "ɷ◡ɷ",
  876. "ʘ‿ʘ",
  877. "(。-ω-)ノ",
  878. "( ・ω・)ノ",
  879. "(=゚ω゚)ノ",
  880. "(・ε・`*) …",
  881. "ʕっ•ᴥ•ʔっ",
  882. "(*˘︶˘*)",
  883. ])
  884. )
  885. def array_sum(
  886. array: typing.List[typing.List[typing.Any]], /
  887. ) -> typing.List[typing.Any]:
  888. """
  889. Performs basic sum operation on array
  890. :param array: Array to sum
  891. :return: Sum of array
  892. """
  893. result = []
  894. for item in array:
  895. result += item
  896. return result
  897. def rand(size: int, /) -> str:
  898. """
  899. Return random string of len `size`
  900. :param size: Length of string
  901. :return: Random string
  902. """
  903. return "".join(
  904. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  905. )
  906. def smart_split(
  907. text: str,
  908. entities: typing.List[FormattingEntity],
  909. length: int = 4096,
  910. split_on: ListLike = ("\n", " "),
  911. min_length: int = 1,
  912. ) -> typing.Iterator[str]:
  913. """
  914. Split the message into smaller messages.
  915. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  916. The end of each message except the last one is stripped of characters from [split_on]
  917. :param text: the plain text input
  918. :param entities: the entities
  919. :param length: the maximum length of a single message
  920. :param split_on: characters (or strings) which are preferred for a message break
  921. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  922. :return: iterator, which returns strings
  923. :example:
  924. >>> utils.smart_split(
  925. *hikkatl.extensions.html.parse(
  926. "<b>Hello, world!</b>"
  927. )
  928. )
  929. <<< ["<b>Hello, world!</b>"]
  930. """
  931. # Authored by @bsolute
  932. # https://t.me/LonamiWebs/27777
  933. encoded = text.encode("utf-16le")
  934. pending_entities = entities
  935. text_offset = 0
  936. bytes_offset = 0
  937. text_length = len(text)
  938. bytes_length = len(encoded)
  939. while text_offset < text_length:
  940. if bytes_offset + length * 2 >= bytes_length:
  941. yield parser.unparse(
  942. text[text_offset:],
  943. list(sorted(pending_entities, key=lambda x: x.offset)),
  944. )
  945. break
  946. codepoint_count = len(
  947. encoded[bytes_offset : bytes_offset + length * 2].decode(
  948. "utf-16le",
  949. errors="ignore",
  950. )
  951. )
  952. for search in split_on:
  953. search_index = text.rfind(
  954. search,
  955. text_offset + min_length,
  956. text_offset + codepoint_count,
  957. )
  958. if search_index != -1:
  959. break
  960. else:
  961. search_index = text_offset + codepoint_count
  962. split_index = grapheme.safe_split_index(text, search_index)
  963. split_offset_utf16 = (
  964. len(text[text_offset:split_index].encode("utf-16le"))
  965. ) // 2
  966. exclude = 0
  967. while (
  968. split_index + exclude < text_length
  969. and text[split_index + exclude] in split_on
  970. ):
  971. exclude += 1
  972. current_entities = []
  973. entities = pending_entities.copy()
  974. pending_entities = []
  975. for entity in entities:
  976. if (
  977. entity.offset < split_offset_utf16
  978. and entity.offset + entity.length > split_offset_utf16 + exclude
  979. ):
  980. # spans boundary
  981. current_entities.append(
  982. _copy_tl(
  983. entity,
  984. length=split_offset_utf16 - entity.offset,
  985. )
  986. )
  987. pending_entities.append(
  988. _copy_tl(
  989. entity,
  990. offset=0,
  991. length=entity.offset
  992. + entity.length
  993. - split_offset_utf16
  994. - exclude,
  995. )
  996. )
  997. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  998. # overlaps boundary
  999. current_entities.append(
  1000. _copy_tl(
  1001. entity,
  1002. length=split_offset_utf16 - entity.offset,
  1003. )
  1004. )
  1005. elif entity.offset < split_offset_utf16:
  1006. # wholly left
  1007. current_entities.append(entity)
  1008. elif (
  1009. entity.offset + entity.length
  1010. > split_offset_utf16 + exclude
  1011. > entity.offset
  1012. ):
  1013. # overlaps right boundary
  1014. pending_entities.append(
  1015. _copy_tl(
  1016. entity,
  1017. offset=0,
  1018. length=entity.offset
  1019. + entity.length
  1020. - split_offset_utf16
  1021. - exclude,
  1022. )
  1023. )
  1024. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  1025. # wholly right
  1026. pending_entities.append(
  1027. _copy_tl(
  1028. entity,
  1029. offset=entity.offset - split_offset_utf16 - exclude,
  1030. )
  1031. )
  1032. current_text = text[text_offset:split_index]
  1033. yield parser.unparse(
  1034. current_text,
  1035. list(sorted(current_entities, key=lambda x: x.offset)),
  1036. )
  1037. text_offset = split_index + exclude
  1038. bytes_offset += len(current_text.encode("utf-16le"))
  1039. def _copy_tl(o, **kwargs):
  1040. d = o.to_dict()
  1041. del d["_"]
  1042. d.update(kwargs)
  1043. return o.__class__(**d)
  1044. def check_url(url: str) -> bool:
  1045. """
  1046. Statically checks url for validity
  1047. :param url: URL to check
  1048. :return: True if valid, False otherwise
  1049. """
  1050. try:
  1051. return bool(urlparse(url).netloc)
  1052. except Exception:
  1053. return False
  1054. def get_git_hash() -> typing.Union[str, bool]:
  1055. """
  1056. Get current Hikka git hash
  1057. :return: Git commit hash
  1058. """
  1059. try:
  1060. return git.Repo().head.commit.hexsha
  1061. except Exception:
  1062. return False
  1063. def get_commit_url() -> str:
  1064. """
  1065. Get current Hikka git commit url
  1066. :return: Git commit url
  1067. """
  1068. try:
  1069. hash_ = get_git_hash()
  1070. return f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
  1071. except Exception:
  1072. return "Unknown"
  1073. def is_serializable(x: typing.Any, /) -> bool:
  1074. """
  1075. Checks if object is JSON-serializable
  1076. :param x: Object to check
  1077. :return: True if object is JSON-serializable, False otherwise
  1078. """
  1079. try:
  1080. json.dumps(x)
  1081. return True
  1082. except Exception:
  1083. return False
  1084. def get_lang_flag(countrycode: str) -> str:
  1085. """
  1086. Gets an emoji of specified countrycode
  1087. :param countrycode: 2-letter countrycode
  1088. :return: Emoji flag
  1089. """
  1090. if (
  1091. len(
  1092. code := [
  1093. c
  1094. for c in countrycode.lower()
  1095. if c in string.ascii_letters + string.digits
  1096. ]
  1097. )
  1098. == 2
  1099. ):
  1100. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  1101. return countrycode
  1102. def get_entity_url(
  1103. entity: typing.Union[User, Channel],
  1104. openmessage: bool = False,
  1105. ) -> str:
  1106. """
  1107. Get link to object, if available
  1108. :param entity: Entity to get url of
  1109. :param openmessage: Use tg://openmessage link for users
  1110. :return: Link to object or empty string
  1111. """
  1112. return (
  1113. (
  1114. f"tg://openmessage?id={entity.id}"
  1115. if openmessage
  1116. else f"tg://user?id={entity.id}"
  1117. )
  1118. if isinstance(entity, User)
  1119. else (
  1120. f"tg://resolve?domain={entity.username}"
  1121. if getattr(entity, "username", None)
  1122. else ""
  1123. )
  1124. )
  1125. async def get_message_link(
  1126. message: Message,
  1127. chat: typing.Optional[typing.Union[Chat, Channel]] = None,
  1128. ) -> str:
  1129. """
  1130. Get link to message
  1131. :param message: Message to get link of
  1132. :param chat: Chat, where message was sent
  1133. :return: Link to message
  1134. """
  1135. if message.is_private:
  1136. return (
  1137. f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
  1138. )
  1139. if not chat and not (chat := message.chat):
  1140. chat = await message.get_chat()
  1141. topic_affix = (
  1142. f"?topic={message.reply_to.reply_to_msg_id}"
  1143. if getattr(message.reply_to, "forum_topic", False)
  1144. else ""
  1145. )
  1146. return (
  1147. f"https://t.me/{chat.username}/{message.id}{topic_affix}"
  1148. if getattr(chat, "username", False)
  1149. else f"https://t.me/c/{chat.id}/{message.id}{topic_affix}"
  1150. )
  1151. def remove_html(text: str, escape: bool = False, keep_emojis: bool = False) -> str:
  1152. """
  1153. Removes HTML tags from text
  1154. :param text: Text to remove HTML from
  1155. :param escape: Escape HTML
  1156. :param keep_emojis: Keep custom emojis
  1157. :return: Text without HTML
  1158. """
  1159. return (escape_html if escape else str)(
  1160. re.sub(
  1161. (
  1162. r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>)"
  1163. if keep_emojis
  1164. else r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)"
  1165. ),
  1166. "",
  1167. text,
  1168. )
  1169. )
  1170. def get_kwargs() -> typing.Dict[str, typing.Any]:
  1171. """
  1172. Get kwargs of function, in which is called
  1173. :return: kwargs
  1174. """
  1175. # https://stackoverflow.com/a/65927265/19170642
  1176. keys, _, _, values = inspect.getargvalues(inspect.currentframe().f_back)
  1177. return {key: values[key] for key in keys if key != "self"}
  1178. def mime_type(message: Message) -> str:
  1179. """
  1180. Get mime type of document in message
  1181. :param message: Message with document
  1182. :return: Mime type or empty string if not present
  1183. """
  1184. return (
  1185. ""
  1186. if not isinstance(message, Message) or not getattr(message, "media", False)
  1187. else getattr(getattr(message, "media", False), "mime_type", False) or ""
  1188. )
  1189. def find_caller(
  1190. stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
  1191. ) -> typing.Any:
  1192. """
  1193. Attempts to find command in stack
  1194. :param stack: Stack to search in
  1195. :return: Command-caller or None
  1196. """
  1197. caller = next(
  1198. (
  1199. frame_info
  1200. for frame_info in stack or inspect.stack()
  1201. if hasattr(frame_info, "function")
  1202. and any(
  1203. inspect.isclass(cls_)
  1204. and issubclass(cls_, Module)
  1205. and cls_ is not Module
  1206. for cls_ in frame_info.frame.f_globals.values()
  1207. )
  1208. ),
  1209. None,
  1210. )
  1211. if not caller:
  1212. return next(
  1213. (
  1214. frame_info.frame.f_locals["func"]
  1215. for frame_info in stack or inspect.stack()
  1216. if hasattr(frame_info, "function")
  1217. and frame_info.function == "future_dispatcher"
  1218. and (
  1219. "CommandDispatcher"
  1220. in getattr(getattr(frame_info, "frame", None), "f_globals", {})
  1221. )
  1222. ),
  1223. None,
  1224. )
  1225. return next(
  1226. (
  1227. getattr(cls_, caller.function, None)
  1228. for cls_ in caller.frame.f_globals.values()
  1229. if inspect.isclass(cls_) and issubclass(cls_, Module)
  1230. ),
  1231. None,
  1232. )
  1233. def validate_html(html: str) -> str:
  1234. """
  1235. Removes broken tags from html
  1236. :param html: HTML to validate
  1237. :return: Valid HTML
  1238. """
  1239. text, entities = hikkatl.extensions.html.parse(html)
  1240. return hikkatl.extensions.html.unparse(escape_html(text), entities)
  1241. def iter_attrs(obj: typing.Any, /) -> typing.List[typing.Tuple[str, typing.Any]]:
  1242. """
  1243. Returns list of attributes of object
  1244. :param obj: Object to iterate over
  1245. :return: List of attributes and their values
  1246. """
  1247. return ((attr, getattr(obj, attr)) for attr in dir(obj))
  1248. def atexit(
  1249. func: typing.Callable,
  1250. use_signal: typing.Optional[int] = None,
  1251. *args,
  1252. **kwargs,
  1253. ) -> None:
  1254. """
  1255. Calls function on exit
  1256. :param func: Function to call
  1257. :param use_signal: If passed, `signal` will be used instead of `atexit`
  1258. :param args: Arguments to pass to function
  1259. :param kwargs: Keyword arguments to pass to function
  1260. :return: None
  1261. """
  1262. if use_signal:
  1263. signal.signal(use_signal, lambda *_: func(*args, **kwargs))
  1264. return
  1265. _atexit.register(functools.partial(func, *args, **kwargs))
  1266. def get_topic(message: Message) -> typing.Optional[int]:
  1267. """
  1268. Get topic id of message
  1269. :param message: Message to get topic of
  1270. :return: int or None if not present
  1271. """
  1272. return (
  1273. (message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id)
  1274. if (
  1275. isinstance(message, Message)
  1276. and message.reply_to
  1277. and message.reply_to.forum_topic
  1278. )
  1279. else (
  1280. message.form["top_msg_id"]
  1281. if isinstance(message, (InlineCall, InlineMessage))
  1282. else None
  1283. )
  1284. )
  1285. def get_ram_usage() -> float:
  1286. """Returns current process tree memory usage in MB"""
  1287. try:
  1288. import psutil
  1289. current_process = psutil.Process(os.getpid())
  1290. mem = current_process.memory_info()[0] / 2.0**20
  1291. for child in current_process.children(recursive=True):
  1292. mem += child.memory_info()[0] / 2.0**20
  1293. return round(mem, 1)
  1294. except Exception:
  1295. return 0
  1296. def get_cpu_usage() -> float:
  1297. """Returns current process tree CPU usage in %"""
  1298. try:
  1299. import psutil
  1300. current_process = psutil.Process(os.getpid())
  1301. cpu = current_process.cpu_percent()
  1302. for child in current_process.children(recursive=True):
  1303. cpu += child.cpu_percent()
  1304. return round(cpu, 1)
  1305. except Exception:
  1306. return 0
  1307. init_ts = time.perf_counter()
  1308. # GeekTG Compatibility
  1309. def get_git_info() -> typing.Tuple[str, str]:
  1310. """
  1311. Get git info
  1312. :return: Git info
  1313. """
  1314. hash_ = get_git_hash()
  1315. return (
  1316. hash_,
  1317. f"https://github.com/hikariatama/Hikka/commit/{hash_}" if hash_ else "",
  1318. )
  1319. def get_version_raw() -> str:
  1320. """
  1321. Get the version of the userbot
  1322. :return: Version in format %s.%s.%s
  1323. """
  1324. from . import version
  1325. return ".".join(map(str, list(version.__version__)))
  1326. get_platform_name = get_named_platform