utils.py 44 KB


  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # ©️ Dan Gazizullin, 2021-2023
  15. # This file is a part of Hikka Userbot
  16. # 🌐 https://github.com/hikariatama/Hikka
  17. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  18. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  19. import asyncio
  20. import atexit as _atexit
  21. import contextlib
  22. import functools
  23. import inspect
  24. import io
  25. import json
  26. import logging
  27. import os
  28. import random
  29. import re
  30. import shlex
  31. import signal
  32. import string
  33. import time
  34. import typing
  35. from datetime import timedelta
  36. from urllib.parse import urlparse
  37. import git
  38. import grapheme
  39. import requests
  40. import telethon
  41. from aiogram.types import Message as AiogramMessage
  42. from telethon import hints
  43. from telethon.tl.custom.message import Message
  44. from telethon.tl.functions.account import UpdateNotifySettingsRequest
  45. from telethon.tl.functions.channels import (
  46. CreateChannelRequest,
  47. EditAdminRequest,
  48. EditPhotoRequest,
  49. InviteToChannelRequest,
  50. )
  51. from telethon.tl.functions.messages import (
  52. GetDialogFiltersRequest,
  53. SetHistoryTTLRequest,
  54. UpdateDialogFilterRequest,
  55. )
  56. from telethon.tl.types import (
  57. Channel,
  58. Chat,
  59. ChatAdminRights,
  60. InputDocument,
  61. InputPeerNotifySettings,
  62. MessageEntityBankCard,
  63. MessageEntityBlockquote,
  64. MessageEntityBold,
  65. MessageEntityBotCommand,
  66. MessageEntityCashtag,
  67. MessageEntityCode,
  68. MessageEntityEmail,
  69. MessageEntityHashtag,
  70. MessageEntityItalic,
  71. MessageEntityMention,
  72. MessageEntityMentionName,
  73. MessageEntityPhone,
  74. MessageEntityPre,
  75. MessageEntitySpoiler,
  76. MessageEntityStrike,
  77. MessageEntityTextUrl,
  78. MessageEntityUnderline,
  79. MessageEntityUnknown,
  80. MessageEntityUrl,
  81. MessageMediaWebPage,
  82. PeerChannel,
  83. PeerChat,
  84. PeerUser,
  85. UpdateNewChannelMessage,
  86. User,
  87. )
  88. from ._internal import fw_protect
  89. from .inline.types import InlineCall, InlineMessage
  90. from .tl_cache import CustomTelegramClient
  91. from .types import HikkaReplyMarkup, ListLike, Module
  92. FormattingEntity = typing.Union[
  93. MessageEntityUnknown,
  94. MessageEntityMention,
  95. MessageEntityHashtag,
  96. MessageEntityBotCommand,
  97. MessageEntityUrl,
  98. MessageEntityEmail,
  99. MessageEntityBold,
  100. MessageEntityItalic,
  101. MessageEntityCode,
  102. MessageEntityPre,
  103. MessageEntityTextUrl,
  104. MessageEntityMentionName,
  105. MessageEntityPhone,
  106. MessageEntityCashtag,
  107. MessageEntityUnderline,
  108. MessageEntityStrike,
  109. MessageEntityBlockquote,
  110. MessageEntityBankCard,
  111. MessageEntitySpoiler,
  112. ]
  113. emoji_pattern = re.compile(
  114. "["
  115. "\U0001F600-\U0001F64F" # emoticons
  116. "\U0001F300-\U0001F5FF" # symbols & pictographs
  117. "\U0001F680-\U0001F6FF" # transport & map symbols
  118. "\U0001F1E0-\U0001F1FF" # flags (iOS)
  119. "]+",
  120. flags=re.UNICODE,
  121. )
  122. parser = telethon.utils.sanitize_parse_mode("html")
  123. logger = logging.getLogger(__name__)
  124. def get_args(message: typing.Union[Message, str]) -> typing.List[str]:
  125. """
  126. Get arguments from message
  127. :param message: Message or string to get arguments from
  128. :return: List of arguments
  129. """
  130. if not (message := getattr(message, "message", message)):
  131. return False
  132. if len(message := message.split(maxsplit=1)) <= 1:
  133. return []
  134. message = message[1]
  135. try:
  136. split = shlex.split(message)
  137. except ValueError:
  138. return message # Cannot split, let's assume that it's just one long message
  139. return list(filter(lambda x: len(x) > 0, split))
  140. def get_args_raw(message: typing.Union[Message, str]) -> str:
  141. """
  142. Get the parameters to the command as a raw string (not split)
  143. :param message: Message or string to get arguments from
  144. :return: Raw string of arguments
  145. """
  146. if not (message := getattr(message, "message", message)):
  147. return False
  148. return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
  149. def get_args_html(message: Message) -> str:
  150. """
  151. Get the parameters to the command as string with HTML (not split)
  152. :param message: Message to get arguments from
  153. :return: String with HTML arguments
  154. """
  155. prefix = message.client.loader.get_prefix()
  156. if not (message := message.text):
  157. return False
  158. if prefix not in message:
  159. return message
  160. raw_text, entities = parser.parse(message)
  161. raw_text = parser._add_surrogate(raw_text)
  162. try:
  163. command = raw_text[
  164. raw_text.index(prefix) : raw_text.index(" ", raw_text.index(prefix) + 1)
  165. ]
  166. except ValueError:
  167. return ""
  168. command_len = len(command) + 1
  169. return parser.unparse(
  170. parser._del_surrogate(raw_text[command_len:]),
  171. relocate_entities(entities, -command_len, raw_text[command_len:]),
  172. )
  173. def get_args_split_by(
  174. message: typing.Union[Message, str],
  175. separator: str,
  176. ) -> typing.List[str]:
  177. """
  178. Split args with a specific separator
  179. :param message: Message or string to get arguments from
  180. :param separator: Separator to split by
  181. :return: List of arguments
  182. """
  183. return [
  184. section.strip() for section in get_args_raw(message).split(separator) if section
  185. ]
  186. def get_chat_id(message: typing.Union[Message, AiogramMessage]) -> int:
  187. """
  188. Get the chat ID, but without -100 if its a channel
  189. :param message: Message to get chat ID from
  190. :return: Chat ID
  191. """
  192. return telethon.utils.resolve_id(
  193. getattr(message, "chat_id", None)
  194. or getattr(getattr(message, "chat", None), "id", None)
  195. )[0]
  196. def get_entity_id(entity: hints.Entity) -> int:
  197. """
  198. Get entity ID
  199. :param entity: Entity to get ID from
  200. :return: Entity ID
  201. """
  202. return telethon.utils.get_peer_id(entity)
  203. def escape_html(text: str, /) -> str: # sourcery skip
  204. """
  205. Pass all untrusted/potentially corrupt input here
  206. :param text: Text to escape
  207. :return: Escaped text
  208. """
  209. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  210. def escape_quotes(text: str, /) -> str:
  211. """
  212. Escape quotes to html quotes
  213. :param text: Text to escape
  214. :return: Escaped text
  215. """
  216. return escape_html(text).replace('"', "&quot;")
  217. def get_base_dir() -> str:
  218. """
  219. Get directory of this file
  220. :return: Directory of this file
  221. """
  222. return get_dir(__file__)
  223. def get_dir(mod: str) -> str:
  224. """
  225. Get directory of given module
  226. :param mod: Module's `__file__` to get directory of
  227. :return: Directory of given module
  228. """
  229. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  230. async def get_user(message: Message) -> typing.Optional[User]:
  231. """
  232. Get user who sent message, searching if not found easily
  233. :param message: Message to get user from
  234. :return: User who sent message
  235. """
  236. try:
  237. return await message.client.get_entity(message.sender_id)
  238. except ValueError: # Not in database. Lets go looking for them.
  239. logger.debug("User not in session cache. Searching...")
  240. if isinstance(message.peer_id, PeerUser):
  241. await message.client.get_dialogs()
  242. return await message.client.get_entity(message.sender_id)
  243. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  244. try:
  245. return await message.client.get_entity(message.sender_id)
  246. except Exception:
  247. pass
  248. async for user in message.client.iter_participants(
  249. message.peer_id,
  250. aggressive=True,
  251. ):
  252. if user.id == message.sender_id:
  253. return user
  254. logger.error("User isn't in the group where they sent the message")
  255. return None
  256. logger.error("`peer_id` is not a user, chat or channel")
  257. return None
  258. def run_sync(func, *args, **kwargs):
  259. """
  260. Run a non-async function in a new thread and return an awaitable
  261. :param func: Sync-only function to execute
  262. :return: Awaitable coroutine
  263. """
  264. return asyncio.get_event_loop().run_in_executor(
  265. None,
  266. functools.partial(func, *args, **kwargs),
  267. )
  268. def run_async(loop: asyncio.AbstractEventLoop, coro: typing.Awaitable) -> typing.Any:
  269. """
  270. Run an async function as a non-async function, blocking till it's done
  271. :param loop: Event loop to run the coroutine in
  272. :param coro: Coroutine to run
  273. :return: Result of the coroutine
  274. """
  275. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  276. def censor(
  277. obj: typing.Any,
  278. to_censor: typing.Optional[typing.Iterable[str]] = None,
  279. replace_with: str = "redacted_{count}_chars",
  280. ):
  281. """
  282. May modify the original object, but don't rely on it
  283. :param obj: Object to censor, preferrably telethon
  284. :param to_censor: Iterable of strings to censor
  285. :param replace_with: String to replace with, {count} will be replaced with the number of characters
  286. :return: Censored object
  287. """
  288. if to_censor is None:
  289. to_censor = ["phone"]
  290. for k, v in vars(obj).items():
  291. if k in to_censor:
  292. setattr(obj, k, replace_with.format(count=len(v)))
  293. elif k[0] != "_" and hasattr(v, "__dict__"):
  294. setattr(obj, k, censor(v, to_censor, replace_with))
  295. return obj
  296. def relocate_entities(
  297. entities: typing.List[FormattingEntity],
  298. offset: int,
  299. text: typing.Optional[str] = None,
  300. ) -> typing.List[FormattingEntity]:
  301. """
  302. Move all entities by offset (truncating at text)
  303. :param entities: List of entities
  304. :param offset: Offset to move by
  305. :param text: Text to truncate at
  306. :return: List of entities
  307. """
  308. length = len(text) if text is not None else 0
  309. for ent in entities.copy() if entities else ():
  310. ent.offset += offset
  311. if ent.offset < 0:
  312. ent.length += ent.offset
  313. ent.offset = 0
  314. if text is not None and ent.offset + ent.length > length:
  315. ent.length = length - ent.offset
  316. if ent.length <= 0:
  317. entities.remove(ent)
  318. return entities
  319. async def answer_file(
  320. message: typing.Union[Message, InlineCall, InlineMessage],
  321. file: typing.Union[str, bytes, io.IOBase, InputDocument],
  322. caption: typing.Optional[str] = None,
  323. **kwargs,
  324. ):
  325. """
  326. Use this to answer a message with a document
  327. :param message: Message to answer
  328. :param file: File to send - url, path or bytes
  329. :param caption: Caption to send
  330. :param kwargs: Extra kwargs to pass to `send_file`
  331. :return: Sent message
  332. :example:
  333. >>> await utils.answer_file(message, "test.txt")
  334. >>> await utils.answer_file(
  335. message,
  336. "https://mods.hikariatama.ru/badges/artai.jpg",
  337. "This is the cool module, check it out!",
  338. )
  339. """
  340. if isinstance(message, (InlineCall, InlineMessage)):
  341. message = message.form["caller"]
  342. if topic := get_topic(message):
  343. kwargs.setdefault("reply_to", topic)
  344. try:
  345. response = await message.client.send_file(
  346. message.peer_id,
  347. file,
  348. caption=caption,
  349. **kwargs,
  350. )
  351. except Exception:
  352. if caption:
  353. logger.warning(
  354. "Failed to send file, sending plain text instead", exc_info=True
  355. )
  356. return await answer(message, caption, **kwargs)
  357. raise
  358. with contextlib.suppress(Exception):
  359. await message.delete()
  360. return response
  361. async def answer(
  362. message: typing.Union[Message, InlineCall, InlineMessage],
  363. response: str,
  364. *,
  365. reply_markup: typing.Optional[HikkaReplyMarkup] = None,
  366. **kwargs,
  367. ) -> typing.Union[InlineCall, InlineMessage, Message]:
  368. """
  369. Use this to give the response to a command
  370. :param message: Message to answer to. Can be a tl message or hikka inline object
  371. :param response: Response to send
  372. :param reply_markup: Reply markup to send. If specified, inline form will be used
  373. :return: Message or inline object
  374. :example:
  375. >>> await utils.answer(message, "Hello world!")
  376. >>> await utils.answer(
  377. message,
  378. "https://some-url.com/photo.jpg",
  379. caption="Hello, this is your photo!",
  380. asfile=True,
  381. )
  382. >>> await utils.answer(
  383. message,
  384. "Hello world!",
  385. reply_markup={"text": "Hello!", "data": "world"},
  386. silent=True,
  387. disable_security=True,
  388. )
  389. """
  390. # Compatibility with FTG\GeekTG
  391. if isinstance(message, list) and message:
  392. message = message[0]
  393. if reply_markup is not None:
  394. if not isinstance(reply_markup, (list, dict)):
  395. raise ValueError("reply_markup must be a list or dict")
  396. if reply_markup:
  397. kwargs.pop("message", None)
  398. if isinstance(message, (InlineMessage, InlineCall)):
  399. await message.edit(response, reply_markup, **kwargs)
  400. return
  401. reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
  402. result = await message.client.loader.inline.form(
  403. response,
  404. message=message if message.out else get_chat_id(message),
  405. reply_markup=reply_markup,
  406. **kwargs,
  407. )
  408. return result
  409. if isinstance(message, (InlineMessage, InlineCall)):
  410. await message.edit(response)
  411. return message
  412. kwargs.setdefault("link_preview", False)
  413. if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
  414. kwargs.setdefault(
  415. "reply_to",
  416. getattr(message, "reply_to_msg_id", None),
  417. )
  418. parse_mode = telethon.utils.sanitize_parse_mode(
  419. kwargs.pop(
  420. "parse_mode",
  421. message.client.parse_mode,
  422. )
  423. )
  424. if isinstance(response, str) and not kwargs.pop("asfile", False):
  425. text, entities = parse_mode.parse(response)
  426. if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
  427. try:
  428. if not message.client.loader.inline.init_complete:
  429. raise
  430. strings = list(smart_split(text, entities, 4096))
  431. if len(strings) > 10:
  432. raise
  433. list_ = await message.client.loader.inline.list(
  434. message=message,
  435. strings=strings,
  436. )
  437. if not list_:
  438. raise
  439. return list_
  440. except Exception:
  441. file = io.BytesIO(text.encode("utf-8"))
  442. file.name = "command_result.txt"
  443. result = await message.client.send_file(
  444. message.peer_id,
  445. file,
  446. caption=message.client.loader.lookup("translations").strings(
  447. "too_long"
  448. ),
  449. reply_to=kwargs.get("reply_to") or get_topic(message),
  450. )
  451. if message.out:
  452. await message.delete()
  453. return result
  454. result = await (message.edit if edit else message.respond)(
  455. text,
  456. parse_mode=lambda t: (t, entities),
  457. **kwargs,
  458. )
  459. elif isinstance(response, Message):
  460. if message.media is None and (
  461. response.media is None or isinstance(response.media, MessageMediaWebPage)
  462. ):
  463. result = await message.edit(
  464. response.message,
  465. parse_mode=lambda t: (t, response.entities or []),
  466. link_preview=isinstance(response.media, MessageMediaWebPage),
  467. )
  468. else:
  469. result = await message.respond(response, **kwargs)
  470. else:
  471. if isinstance(response, bytes):
  472. response = io.BytesIO(response)
  473. elif isinstance(response, str):
  474. response = io.BytesIO(response.encode("utf-8"))
  475. if name := kwargs.pop("filename", None):
  476. response.name = name
  477. if message.media is not None and edit:
  478. await message.edit(file=response, **kwargs)
  479. else:
  480. kwargs.setdefault(
  481. "reply_to",
  482. getattr(message, "reply_to_msg_id", get_topic(message)),
  483. )
  484. result = await message.client.send_file(message.peer_id, response, **kwargs)
  485. if message.out:
  486. await message.delete()
  487. return result
  488. async def get_target(message: Message, arg_no: int = 0) -> typing.Optional[int]:
  489. """
  490. Get target from message
  491. :param message: Message to get target from
  492. :param arg_no: Argument number to get target from
  493. :return: Target
  494. """
  495. if any(
  496. isinstance(entity, MessageEntityMentionName)
  497. for entity in (message.entities or [])
  498. ):
  499. e = sorted(
  500. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  501. key=lambda x: x.offset,
  502. )[0]
  503. return e.user_id
  504. if len(get_args(message)) > arg_no:
  505. user = get_args(message)[arg_no]
  506. elif message.is_reply:
  507. return (await message.get_reply_message()).sender_id
  508. elif hasattr(message.peer_id, "user_id"):
  509. user = message.peer_id.user_id
  510. else:
  511. return None
  512. try:
  513. entity = await message.client.get_entity(user)
  514. except ValueError:
  515. return None
  516. else:
  517. if isinstance(entity, User):
  518. return entity.id
  519. def merge(a: dict, b: dict, /) -> dict:
  520. """
  521. Merge with replace dictionary a to dictionary b
  522. :param a: Dictionary to merge
  523. :param b: Dictionary to merge to
  524. :return: Merged dictionary
  525. """
  526. for key in a:
  527. if key in b:
  528. if isinstance(a[key], dict) and isinstance(b[key], dict):
  529. b[key] = merge(a[key], b[key])
  530. elif isinstance(a[key], list) and isinstance(b[key], list):
  531. b[key] = list(set(b[key] + a[key]))
  532. else:
  533. b[key] = a[key]
  534. b[key] = a[key]
  535. return b
  536. async def set_avatar(
  537. client: CustomTelegramClient,
  538. peer: hints.Entity,
  539. avatar: str,
  540. ) -> bool:
  541. """
  542. Sets an entity avatar
  543. :param client: Client to use
  544. :param peer: Peer to set avatar to
  545. :param avatar: Avatar to set
  546. :return: True if avatar was set, False otherwise
  547. """
  548. if isinstance(avatar, str) and check_url(avatar):
  549. f = (
  550. await run_sync(
  551. requests.get,
  552. avatar,
  553. )
  554. ).content
  555. elif isinstance(avatar, bytes):
  556. f = avatar
  557. else:
  558. return False
  559. await fw_protect()
  560. res = await client(
  561. EditPhotoRequest(
  562. channel=peer,
  563. photo=await client.upload_file(f, file_name="photo.png"),
  564. )
  565. )
  566. await fw_protect()
  567. try:
  568. await client.delete_messages(
  569. peer,
  570. message_ids=[
  571. next(
  572. update
  573. for update in res.updates
  574. if isinstance(update, UpdateNewChannelMessage)
  575. ).message.id
  576. ],
  577. )
  578. except Exception:
  579. pass
  580. return True
  581. async def invite_inline_bot(
  582. client: CustomTelegramClient,
  583. peer: hints.EntityLike,
  584. ) -> None:
  585. """
  586. Invites inline bot to a chat
  587. :param client: Client to use
  588. :param peer: Peer to invite bot to
  589. :return: None
  590. :raise RuntimeError: If error occurred while inviting bot
  591. """
  592. try:
  593. await client(InviteToChannelRequest(peer, [client.loader.inline.bot_username]))
  594. except Exception as e:
  595. raise RuntimeError(
  596. "Can't invite inline bot to old asset chat, which is required by module"
  597. ) from e
  598. with contextlib.suppress(Exception):
  599. await client(
  600. EditAdminRequest(
  601. channel=peer,
  602. user_id=client.loader.inline.bot_username,
  603. admin_rights=ChatAdminRights(ban_users=True),
  604. rank="Hikka",
  605. )
  606. )
  607. async def asset_channel(
  608. client: CustomTelegramClient,
  609. title: str,
  610. description: str,
  611. *,
  612. channel: bool = False,
  613. silent: bool = False,
  614. archive: bool = False,
  615. invite_bot: bool = False,
  616. avatar: typing.Optional[str] = None,
  617. ttl: typing.Optional[int] = None,
  618. _folder: typing.Optional[str] = None,
  619. ) -> typing.Tuple[Channel, bool]:
  620. """
  621. Create new channel (if needed) and return its entity
  622. :param client: Telegram client to create channel by
  623. :param title: Channel title
  624. :param description: Description
  625. :param channel: Whether to create a channel or supergroup
  626. :param silent: Automatically mute channel
  627. :param archive: Automatically archive channel
  628. :param invite_bot: Add inline bot and assure it's in chat
  629. :param avatar: Url to an avatar to set as pfp of created peer
  630. :param ttl: Time to live for messages in channel
  631. :return: Peer and bool: is channel new or pre-existent
  632. """
  633. if not hasattr(client, "_channels_cache"):
  634. client._channels_cache = {}
  635. if (
  636. title in client._channels_cache
  637. and client._channels_cache[title]["exp"] > time.time()
  638. ):
  639. return client._channels_cache[title]["peer"], False
  640. async for d in client.iter_dialogs():
  641. if d.title == title:
  642. client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
  643. if invite_bot:
  644. if all(
  645. participant.id != client.loader.inline.bot_id
  646. for participant in (
  647. await client.get_participants(d.entity, limit=100)
  648. )
  649. ):
  650. await fw_protect()
  651. await invite_inline_bot(client, d.entity)
  652. return d.entity, False
  653. await fw_protect()
  654. peer = (
  655. await client(
  656. CreateChannelRequest(
  657. title,
  658. description,
  659. megagroup=not channel,
  660. )
  661. )
  662. ).chats[0]
  663. if invite_bot:
  664. await fw_protect()
  665. await invite_inline_bot(client, peer)
  666. if silent:
  667. await fw_protect()
  668. await dnd(client, peer, archive)
  669. elif archive:
  670. await fw_protect()
  671. await client.edit_folder(peer, 1)
  672. if avatar:
  673. await fw_protect()
  674. await set_avatar(client, peer, avatar)
  675. if ttl:
  676. await fw_protect()
  677. await client(SetHistoryTTLRequest(peer=peer, period=ttl))
  678. if _folder:
  679. if _folder != "hikka":
  680. raise NotImplementedError
  681. folders = await client(GetDialogFiltersRequest())
  682. try:
  683. folder = next(folder for folder in folders if folder.title == "hikka")
  684. except Exception:
  685. folder = None
  686. if folder is not None and not any(
  687. peer.id == getattr(folder_peer, "channel_id", None)
  688. for folder_peer in folder.include_peers
  689. ):
  690. folder.include_peers += [await client.get_input_entity(peer)]
  691. await client(
  692. UpdateDialogFilterRequest(
  693. folder.id,
  694. folder,
  695. )
  696. )
  697. client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
  698. return peer, True
  699. async def dnd(
  700. client: CustomTelegramClient,
  701. peer: hints.Entity,
  702. archive: bool = True,
  703. ) -> bool:
  704. """
  705. Mutes and optionally archives peer
  706. :param peer: Anything entity-link
  707. :param archive: Archive peer, or just mute?
  708. :return: `True` on success, otherwise `False`
  709. """
  710. try:
  711. await client(
  712. UpdateNotifySettingsRequest(
  713. peer=peer,
  714. settings=InputPeerNotifySettings(
  715. show_previews=False,
  716. silent=True,
  717. mute_until=2**31 - 1,
  718. ),
  719. )
  720. )
  721. if archive:
  722. await fw_protect()
  723. await client.edit_folder(peer, 1)
  724. except Exception:
  725. logger.exception("utils.dnd error")
  726. return False
  727. return True
  728. def get_link(user: typing.Union[User, Channel], /) -> str:
  729. """
  730. Get telegram permalink to entity
  731. :param user: User or channel
  732. :return: Link to entity
  733. """
  734. return (
  735. f"tg://user?id={user.id}"
  736. if isinstance(user, User)
  737. else (
  738. f"tg://resolve?domain={user.username}"
  739. if getattr(user, "username", None)
  740. else ""
  741. )
  742. )
  743. def chunks(_list: ListLike, n: int, /) -> typing.List[typing.List[typing.Any]]:
  744. """
  745. Split provided `_list` into chunks of `n`
  746. :param _list: List to split
  747. :param n: Chunk size
  748. :return: List of chunks
  749. """
  750. return [_list[i : i + n] for i in range(0, len(_list), n)]
  751. def get_named_platform() -> str:
  752. """
  753. Returns formatted platform name
  754. :return: Platform name
  755. """
  756. try:
  757. if os.path.isfile("/proc/device-tree/model"):
  758. with open("/proc/device-tree/model") as f:
  759. model = f.read()
  760. if "Orange" in model:
  761. return f"🍊 {model}"
  762. return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
  763. except Exception:
  764. # In case of weird fs, aka Termux
  765. pass
  766. try:
  767. from platform import uname
  768. if "microsoft-standard" in uname().release:
  769. return "🍁 WSL"
  770. except Exception:
  771. pass
  772. if "GOORM" in os.environ:
  773. return "🦾 GoormIDE"
  774. if "RAILWAY" in os.environ:
  775. return "🚂 Railway"
  776. if "DOCKER" in os.environ:
  777. return "🐳 Docker"
  778. if "com.termux" in os.environ.get("PREFIX", ""):
  779. return "🕶 Termux"
  780. if "CODESPACES" in os.environ:
  781. return "🐈‍⬛ Codespaces"
  782. return f"✌️ lavHost {os.environ['LAVHOST']}" if "LAVHOST" in os.environ else "📻 VDS"
  783. def get_platform_emoji() -> str:
  784. """
  785. Returns custom emoji for current platform
  786. :return: Emoji entity in string
  787. """
  788. BASE = "".join(
  789. (
  790. "<emoji document_id={}>🌘</emoji>",
  791. "<emoji document_id=5195311729663286630>🌘</emoji>",
  792. "<emoji document_id=5195045669324201904>🌘</emoji>",
  793. )
  794. )
  795. if "DOCKER" in os.environ:
  796. return BASE.format(5298554256603752468)
  797. if "LAVHOST" in os.environ:
  798. return BASE.format(5301078610747074753)
  799. if "GOORM" in os.environ:
  800. return BASE.format(5298947740032573902)
  801. if "CODESPACES" in os.environ:
  802. return BASE.format(5194976881127989720)
  803. if "com.termux" in os.environ.get("PREFIX", ""):
  804. return BASE.format(5193051778001673828)
  805. if "RAILWAY" in os.environ:
  806. return BASE.format(5199607521593007466)
  807. return BASE.format(5192765204898783881)
  808. def uptime() -> int:
  809. """
  810. Returns userbot uptime in seconds
  811. :return: Uptime in seconds
  812. """
  813. return round(time.perf_counter() - init_ts)
  814. def formatted_uptime() -> str:
  815. """
  816. Returnes formmated uptime
  817. :return: Formatted uptime
  818. """
  819. return f"{str(timedelta(seconds=uptime()))}"
  820. def ascii_face() -> str:
  821. """
  822. Returnes cute ASCII-art face
  823. :return: ASCII-art face
  824. """
  825. return escape_html(
  826. random.choice(
  827. [
  828. "ヽ(๑◠ܫ◠๑)ノ",
  829. "(◕ᴥ◕ʋ)",
  830. "ᕙ(`▽´)ᕗ",
  831. "(✿◠‿◠)",
  832. "(▰˘◡˘▰)",
  833. "(˵ ͡° ͜ʖ ͡°˵)",
  834. "ʕっ•ᴥ•ʔっ",
  835. "( ͡° ᴥ ͡°)",
  836. "(๑•́ ヮ •̀๑)",
  837. "٩(^‿^)۶",
  838. "(っˆڡˆς)",
  839. "ψ(`∇´)ψ",
  840. "⊙ω⊙",
  841. "٩(^ᴗ^)۶",
  842. "(´・ω・)っ由",
  843. "( ͡~ ͜ʖ ͡°)",
  844. "✧♡(◕‿◕✿)",
  845. "โ๏௰๏ใ ื",
  846. "∩。• ᵕ •。∩ ♡",
  847. "(♡´౪`♡)",
  848. "(◍>◡<◍)⋈。✧♡",
  849. "╰(✿´⌣`✿)╯♡",
  850. "ʕ•ᴥ•ʔ",
  851. "ᶘ ◕ᴥ◕ᶅ",
  852. "▼・ᴥ・▼",
  853. "ฅ^•ﻌ•^ฅ",
  854. "(΄◞ิ౪◟ิ‵)",
  855. "٩(^ᴗ^)۶",
  856. "ᕴーᴥーᕵ",
  857. "ʕ→ᴥ←ʔ",
  858. "ʕᵕᴥᵕʔ",
  859. "ʕᵒᴥᵒʔ",
  860. "ᵔᴥᵔ",
  861. "(✿╹◡╹)",
  862. "(๑→ܫ←)",
  863. "ʕ·ᴥ· ʔ",
  864. "(ノ≧ڡ≦)",
  865. "(≖ᴗ≖✿)",
  866. "(〜^∇^ )〜",
  867. "( ノ・ェ・ )ノ",
  868. "~( ˘▾˘~)",
  869. "(〜^∇^)〜",
  870. "ヽ(^ᴗ^ヽ)",
  871. "(´・ω・`)",
  872. "₍ᐢ•ﻌ•ᐢ₎*・゚。",
  873. "(。・・)_且",
  874. "(=`ω´=)",
  875. "(*•‿•*)",
  876. "(*゚∀゚*)",
  877. "(☉⋆‿⋆☉)",
  878. "ɷ◡ɷ",
  879. "ʘ‿ʘ",
  880. "(。-ω-)ノ",
  881. "( ・ω・)ノ",
  882. "(=゚ω゚)ノ",
  883. "(・ε・`*) …",
  884. "ʕっ•ᴥ•ʔっ",
  885. "(*˘︶˘*)",
  886. ]
  887. )
  888. )
  889. def array_sum(
  890. array: typing.List[typing.List[typing.Any]], /
  891. ) -> typing.List[typing.Any]:
  892. """
  893. Performs basic sum operation on array
  894. :param array: Array to sum
  895. :return: Sum of array
  896. """
  897. result = []
  898. for item in array:
  899. result += item
  900. return result
  901. def rand(size: int, /) -> str:
  902. """
  903. Return random string of len `size`
  904. :param size: Length of string
  905. :return: Random string
  906. """
  907. return "".join(
  908. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  909. )
  910. def smart_split(
  911. text: str,
  912. entities: typing.List[FormattingEntity],
  913. length: int = 4096,
  914. split_on: ListLike = ("\n", " "),
  915. min_length: int = 1,
  916. ) -> typing.Iterator[str]:
  917. """
  918. Split the message into smaller messages.
  919. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  920. The end of each message except the last one is stripped of characters from [split_on]
  921. :param text: the plain text input
  922. :param entities: the entities
  923. :param length: the maximum length of a single message
  924. :param split_on: characters (or strings) which are preferred for a message break
  925. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  926. :return: iterator, which returns strings
  927. :example:
  928. >>> utils.smart_split(
  929. *telethon.extensions.html.parse(
  930. "<b>Hello, world!</b>"
  931. )
  932. )
  933. <<< ["<b>Hello, world!</b>"]
  934. """
  935. # Authored by @bsolute
  936. # https://t.me/LonamiWebs/27777
  937. encoded = text.encode("utf-16le")
  938. pending_entities = entities
  939. text_offset = 0
  940. bytes_offset = 0
  941. text_length = len(text)
  942. bytes_length = len(encoded)
  943. while text_offset < text_length:
  944. if bytes_offset + length * 2 >= bytes_length:
  945. yield parser.unparse(
  946. text[text_offset:],
  947. list(sorted(pending_entities, key=lambda x: x.offset)),
  948. )
  949. break
  950. codepoint_count = len(
  951. encoded[bytes_offset : bytes_offset + length * 2].decode(
  952. "utf-16le",
  953. errors="ignore",
  954. )
  955. )
  956. for search in split_on:
  957. search_index = text.rfind(
  958. search,
  959. text_offset + min_length,
  960. text_offset + codepoint_count,
  961. )
  962. if search_index != -1:
  963. break
  964. else:
  965. search_index = text_offset + codepoint_count
  966. split_index = grapheme.safe_split_index(text, search_index)
  967. split_offset_utf16 = (
  968. len(text[text_offset:split_index].encode("utf-16le"))
  969. ) // 2
  970. exclude = 0
  971. while (
  972. split_index + exclude < text_length
  973. and text[split_index + exclude] in split_on
  974. ):
  975. exclude += 1
  976. current_entities = []
  977. entities = pending_entities.copy()
  978. pending_entities = []
  979. for entity in entities:
  980. if (
  981. entity.offset < split_offset_utf16
  982. and entity.offset + entity.length > split_offset_utf16 + exclude
  983. ):
  984. # spans boundary
  985. current_entities.append(
  986. _copy_tl(
  987. entity,
  988. length=split_offset_utf16 - entity.offset,
  989. )
  990. )
  991. pending_entities.append(
  992. _copy_tl(
  993. entity,
  994. offset=0,
  995. length=entity.offset
  996. + entity.length
  997. - split_offset_utf16
  998. - exclude,
  999. )
  1000. )
  1001. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  1002. # overlaps boundary
  1003. current_entities.append(
  1004. _copy_tl(
  1005. entity,
  1006. length=split_offset_utf16 - entity.offset,
  1007. )
  1008. )
  1009. elif entity.offset < split_offset_utf16:
  1010. # wholly left
  1011. current_entities.append(entity)
  1012. elif (
  1013. entity.offset + entity.length
  1014. > split_offset_utf16 + exclude
  1015. > entity.offset
  1016. ):
  1017. # overlaps right boundary
  1018. pending_entities.append(
  1019. _copy_tl(
  1020. entity,
  1021. offset=0,
  1022. length=entity.offset
  1023. + entity.length
  1024. - split_offset_utf16
  1025. - exclude,
  1026. )
  1027. )
  1028. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  1029. # wholly right
  1030. pending_entities.append(
  1031. _copy_tl(
  1032. entity,
  1033. offset=entity.offset - split_offset_utf16 - exclude,
  1034. )
  1035. )
  1036. current_text = text[text_offset:split_index]
  1037. yield parser.unparse(
  1038. current_text,
  1039. list(sorted(current_entities, key=lambda x: x.offset)),
  1040. )
  1041. text_offset = split_index + exclude
  1042. bytes_offset += len(current_text.encode("utf-16le"))
  1043. def _copy_tl(o, **kwargs):
  1044. d = o.to_dict()
  1045. del d["_"]
  1046. d.update(kwargs)
  1047. return o.__class__(**d)
  1048. def check_url(url: str) -> bool:
  1049. """
  1050. Statically checks url for validity
  1051. :param url: URL to check
  1052. :return: True if valid, False otherwise
  1053. """
  1054. try:
  1055. return bool(urlparse(url).netloc)
  1056. except Exception:
  1057. return False
  1058. def get_git_hash() -> typing.Union[str, bool]:
  1059. """
  1060. Get current Hikka git hash
  1061. :return: Git commit hash
  1062. """
  1063. try:
  1064. return git.Repo().head.commit.hexsha
  1065. except Exception:
  1066. return False
  1067. def get_commit_url() -> str:
  1068. """
  1069. Get current Hikka git commit url
  1070. :return: Git commit url
  1071. """
  1072. try:
  1073. hash_ = get_git_hash()
  1074. return (
  1075. f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
  1076. )
  1077. except Exception:
  1078. return "Unknown"
  1079. def is_serializable(x: typing.Any, /) -> bool:
  1080. """
  1081. Checks if object is JSON-serializable
  1082. :param x: Object to check
  1083. :return: True if object is JSON-serializable, False otherwise
  1084. """
  1085. try:
  1086. json.dumps(x)
  1087. return True
  1088. except Exception:
  1089. return False
  1090. def get_lang_flag(countrycode: str) -> str:
  1091. """
  1092. Gets an emoji of specified countrycode
  1093. :param countrycode: 2-letter countrycode
  1094. :return: Emoji flag
  1095. """
  1096. if (
  1097. len(
  1098. code := [
  1099. c
  1100. for c in countrycode.lower()
  1101. if c in string.ascii_letters + string.digits
  1102. ]
  1103. )
  1104. == 2
  1105. ):
  1106. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  1107. return countrycode
  1108. def get_entity_url(
  1109. entity: typing.Union[User, Channel],
  1110. openmessage: bool = False,
  1111. ) -> str:
  1112. """
  1113. Get link to object, if available
  1114. :param entity: Entity to get url of
  1115. :param openmessage: Use tg://openmessage link for users
  1116. :return: Link to object or empty string
  1117. """
  1118. return (
  1119. (
  1120. f"tg://openmessage?id={entity.id}"
  1121. if openmessage
  1122. else f"tg://user?id={entity.id}"
  1123. )
  1124. if isinstance(entity, User)
  1125. else (
  1126. f"tg://resolve?domain={entity.username}"
  1127. if getattr(entity, "username", None)
  1128. else ""
  1129. )
  1130. )
  1131. async def get_message_link(
  1132. message: Message,
  1133. chat: typing.Optional[typing.Union[Chat, Channel]] = None,
  1134. ) -> str:
  1135. """
  1136. Get link to message
  1137. :param message: Message to get link of
  1138. :param chat: Chat, where message was sent
  1139. :return: Link to message
  1140. """
  1141. if message.is_private:
  1142. return (
  1143. f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
  1144. )
  1145. if not chat and not (chat := message.chat):
  1146. chat = await message.get_chat()
  1147. topic_affix = (
  1148. f"?topic={message.reply_to.reply_to_msg_id}"
  1149. if getattr(message.reply_to, "forum_topic", False)
  1150. else ""
  1151. )
  1152. return (
  1153. f"https://t.me/{chat.username}/{message.id}{topic_affix}"
  1154. if getattr(chat, "username", False)
  1155. else f"https://t.me/c/{chat.id}/{message.id}{topic_affix}"
  1156. )
  1157. def remove_html(text: str, escape: bool = False, keep_emojis: bool = False) -> str:
  1158. """
  1159. Removes HTML tags from text
  1160. :param text: Text to remove HTML from
  1161. :param escape: Escape HTML
  1162. :param keep_emojis: Keep custom emojis
  1163. :return: Text without HTML
  1164. """
  1165. return (escape_html if escape else str)(
  1166. re.sub(
  1167. r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>)"
  1168. if keep_emojis
  1169. else r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)",
  1170. "",
  1171. text,
  1172. )
  1173. )
  1174. def get_kwargs() -> typing.Dict[str, typing.Any]:
  1175. """
  1176. Get kwargs of function, in which is called
  1177. :return: kwargs
  1178. """
  1179. # https://stackoverflow.com/a/65927265/19170642
  1180. keys, _, _, values = inspect.getargvalues(inspect.currentframe().f_back)
  1181. return {key: values[key] for key in keys if key != "self"}
  1182. def mime_type(message: Message) -> str:
  1183. """
  1184. Get mime type of document in message
  1185. :param message: Message with document
  1186. :return: Mime type or empty string if not present
  1187. """
  1188. return (
  1189. ""
  1190. if not isinstance(message, Message) or not getattr(message, "media", False)
  1191. else getattr(getattr(message, "media", False), "mime_type", False) or ""
  1192. )
  1193. def find_caller(
  1194. stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
  1195. ) -> typing.Any:
  1196. """
  1197. Attempts to find command in stack
  1198. :param stack: Stack to search in
  1199. :return: Command-caller or None
  1200. """
  1201. caller = next(
  1202. (
  1203. frame_info
  1204. for frame_info in stack or inspect.stack()
  1205. if hasattr(frame_info, "function")
  1206. and any(
  1207. inspect.isclass(cls_)
  1208. and issubclass(cls_, Module)
  1209. and cls_ is not Module
  1210. for cls_ in frame_info.frame.f_globals.values()
  1211. )
  1212. ),
  1213. None,
  1214. )
  1215. if not caller:
  1216. return next(
  1217. (
  1218. frame_info.frame.f_locals["func"]
  1219. for frame_info in stack or inspect.stack()
  1220. if hasattr(frame_info, "function")
  1221. and frame_info.function == "future_dispatcher"
  1222. and (
  1223. "CommandDispatcher"
  1224. in getattr(getattr(frame_info, "frame", None), "f_globals", {})
  1225. )
  1226. ),
  1227. None,
  1228. )
  1229. return next(
  1230. (
  1231. getattr(cls_, caller.function, None)
  1232. for cls_ in caller.frame.f_globals.values()
  1233. if inspect.isclass(cls_) and issubclass(cls_, Module)
  1234. ),
  1235. None,
  1236. )
  1237. def validate_html(html: str) -> str:
  1238. """
  1239. Removes broken tags from html
  1240. :param html: HTML to validate
  1241. :return: Valid HTML
  1242. """
  1243. text, entities = telethon.extensions.html.parse(html)
  1244. return telethon.extensions.html.unparse(escape_html(text), entities)
  1245. def iter_attrs(obj: typing.Any, /) -> typing.List[typing.Tuple[str, typing.Any]]:
  1246. """
  1247. Returns list of attributes of object
  1248. :param obj: Object to iterate over
  1249. :return: List of attributes and their values
  1250. """
  1251. return ((attr, getattr(obj, attr)) for attr in dir(obj))
  1252. def atexit(
  1253. func: typing.Callable,
  1254. use_signal: typing.Optional[int] = None,
  1255. *args,
  1256. **kwargs,
  1257. ) -> None:
  1258. """
  1259. Calls function on exit
  1260. :param func: Function to call
  1261. :param use_signal: If passed, `signal` will be used instead of `atexit`
  1262. :param args: Arguments to pass to function
  1263. :param kwargs: Keyword arguments to pass to function
  1264. :return: None
  1265. """
  1266. if use_signal:
  1267. signal.signal(use_signal, lambda *_: func(*args, **kwargs))
  1268. return
  1269. _atexit.register(functools.partial(func, *args, **kwargs))
  1270. def get_topic(message: Message) -> typing.Optional[int]:
  1271. """
  1272. Get topic id of message
  1273. :param message: Message to get topic of
  1274. :return: int or None if not present
  1275. """
  1276. return (
  1277. (message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id)
  1278. if (
  1279. isinstance(message, Message)
  1280. and message.reply_to
  1281. and message.reply_to.forum_topic
  1282. )
  1283. else message.form["top_msg_id"]
  1284. if isinstance(message, (InlineCall, InlineMessage))
  1285. else None
  1286. )
  1287. def get_ram_usage() -> float:
  1288. """Returns current process tree memory usage in MB"""
  1289. try:
  1290. import psutil
  1291. current_process = psutil.Process(os.getpid())
  1292. mem = current_process.memory_info()[0] / 2.0**20
  1293. for child in current_process.children(recursive=True):
  1294. mem += child.memory_info()[0] / 2.0**20
  1295. return round(mem, 1)
  1296. except Exception:
  1297. return 0
  1298. def get_cpu_usage() -> float:
  1299. """Returns current process tree CPU usage in %"""
  1300. try:
  1301. import psutil
  1302. current_process = psutil.Process(os.getpid())
  1303. cpu = current_process.cpu_percent()
  1304. for child in current_process.children(recursive=True):
  1305. cpu += child.cpu_percent()
  1306. return round(cpu, 1)
  1307. except Exception:
  1308. return 0
  1309. init_ts = time.perf_counter()
  1310. # GeekTG Compatibility
  1311. def get_git_info() -> typing.Tuple[str, str]:
  1312. """
  1313. Get git info
  1314. :return: Git info
  1315. """
  1316. hash_ = get_git_hash()
  1317. return (
  1318. hash_,
  1319. f"https://github.com/hikariatama/Hikka/commit/{hash_}" if hash_ else "",
  1320. )
  1321. def get_version_raw() -> str:
  1322. """
  1323. Get the version of the userbot
  1324. :return: Version in format %s.%s.%s
  1325. """
  1326. from . import version
  1327. return ".".join(map(str, list(version.__version__)))
  1328. get_platform_name = get_named_platform