123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563 |
- """Utilities"""
- # Friendly Telegram (telegram userbot)
- # Copyright (C) 2018-2021 The Authors
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- # ©️ Dan Gazizullin, 2021-2022
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import atexit as _atexit
- import contextlib
- import functools
- import inspect
- import io
- import json
- import logging
- import os
- import random
- import re
- import shlex
- import signal
- import string
- import time
- import typing
- from datetime import timedelta
- from urllib.parse import urlparse
- import git
- import grapheme
- import requests
- import telethon
- from aiogram.types import Message as AiogramMessage
- from telethon import hints
- from telethon.tl.custom.message import Message
- from telethon.tl.functions.account import UpdateNotifySettingsRequest
- from telethon.tl.functions.channels import (
- CreateChannelRequest,
- EditAdminRequest,
- EditPhotoRequest,
- InviteToChannelRequest,
- )
- from telethon.tl.functions.messages import (
- GetDialogFiltersRequest,
- SetHistoryTTLRequest,
- UpdateDialogFilterRequest,
- )
- from telethon.tl.types import (
- Channel,
- Chat,
- ChatAdminRights,
- InputDocument,
- InputPeerNotifySettings,
- MessageEntityBankCard,
- MessageEntityBlockquote,
- MessageEntityBold,
- MessageEntityBotCommand,
- MessageEntityCashtag,
- MessageEntityCode,
- MessageEntityEmail,
- MessageEntityHashtag,
- MessageEntityItalic,
- MessageEntityMention,
- MessageEntityMentionName,
- MessageEntityPhone,
- MessageEntityPre,
- MessageEntitySpoiler,
- MessageEntityStrike,
- MessageEntityTextUrl,
- MessageEntityUnderline,
- MessageEntityUnknown,
- MessageEntityUrl,
- MessageMediaWebPage,
- PeerChannel,
- PeerChat,
- PeerUser,
- UpdateNewChannelMessage,
- User,
- )
- from .inline.types import InlineCall, InlineMessage
- from .tl_cache import CustomTelegramClient
- from .types import HikkaReplyMarkup, ListLike, Module
- FormattingEntity = typing.Union[
- MessageEntityUnknown,
- MessageEntityMention,
- MessageEntityHashtag,
- MessageEntityBotCommand,
- MessageEntityUrl,
- MessageEntityEmail,
- MessageEntityBold,
- MessageEntityItalic,
- MessageEntityCode,
- MessageEntityPre,
- MessageEntityTextUrl,
- MessageEntityMentionName,
- MessageEntityPhone,
- MessageEntityCashtag,
- MessageEntityUnderline,
- MessageEntityStrike,
- MessageEntityBlockquote,
- MessageEntityBankCard,
- MessageEntitySpoiler,
- ]
- emoji_pattern = re.compile(
- "["
- "\U0001F600-\U0001F64F" # emoticons
- "\U0001F300-\U0001F5FF" # symbols & pictographs
- "\U0001F680-\U0001F6FF" # transport & map symbols
- "\U0001F1E0-\U0001F1FF" # flags (iOS)
- "]+",
- flags=re.UNICODE,
- )
- parser = telethon.utils.sanitize_parse_mode("html")
- logger = logging.getLogger(__name__)
- def get_args(message: typing.Union[Message, str]) -> typing.List[str]:
- """
- Get arguments from message
- :param message: Message or string to get arguments from
- :return: List of arguments
- """
- if not (message := getattr(message, "message", message)):
- return False
- if len(message := message.split(maxsplit=1)) <= 1:
- return []
- message = message[1]
- try:
- split = shlex.split(message)
- except ValueError:
- return message # Cannot split, let's assume that it's just one long message
- return list(filter(lambda x: len(x) > 0, split))
- def get_args_raw(message: typing.Union[Message, str]) -> str:
- """
- Get the parameters to the command as a raw string (not split)
- :param message: Message or string to get arguments from
- :return: Raw string of arguments
- """
- if not (message := getattr(message, "message", message)):
- return False
- return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
- def get_args_html(message: Message) -> str:
- """
- Get the parameters to the command as string with HTML (not split)
- :param message: Message to get arguments from
- :return: String with HTML arguments
- """
- prefix = message.client.loader.get_prefix()
- if not (message := message.text):
- return False
- if prefix not in message:
- return message
- raw_text, entities = parser.parse(message)
- raw_text = parser._add_surrogate(raw_text)
- try:
- command = raw_text[
- raw_text.index(prefix) : raw_text.index(" ", raw_text.index(prefix) + 1)
- ]
- except ValueError:
- return ""
- command_len = len(command) + 1
- return parser.unparse(
- parser._del_surrogate(raw_text[command_len:]),
- relocate_entities(entities, -command_len, raw_text[command_len:]),
- )
- def get_args_split_by(
- message: typing.Union[Message, str],
- separator: str,
- ) -> typing.List[str]:
- """
- Split args with a specific separator
- :param message: Message or string to get arguments from
- :param separator: Separator to split by
- :return: List of arguments
- """
- return [
- section.strip() for section in get_args_raw(message).split(separator) if section
- ]
- def get_chat_id(message: typing.Union[Message, AiogramMessage]) -> int:
- """
- Get the chat ID, but without -100 if its a channel
- :param message: Message to get chat ID from
- :return: Chat ID
- """
- return telethon.utils.resolve_id(
- getattr(message, "chat_id", None)
- or getattr(getattr(message, "chat", None), "id", None)
- )[0]
- def get_entity_id(entity: hints.Entity) -> int:
- """
- Get entity ID
- :param entity: Entity to get ID from
- :return: Entity ID
- """
- return telethon.utils.get_peer_id(entity)
- def escape_html(text: str, /) -> str: # sourcery skip
- """
- Pass all untrusted/potentially corrupt input here
- :param text: Text to escape
- :return: Escaped text
- """
- return str(text).replace("&", "&").replace("<", "<").replace(">", ">")
- def escape_quotes(text: str, /) -> str:
- """
- Escape quotes to html quotes
- :param text: Text to escape
- :return: Escaped text
- """
- return escape_html(text).replace('"', """)
- def get_base_dir() -> str:
- """
- Get directory of this file
- :return: Directory of this file
- """
- return get_dir(__file__)
- def get_dir(mod: str) -> str:
- """
- Get directory of given module
- :param mod: Module's `__file__` to get directory of
- :return: Directory of given module
- """
- return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
- async def get_user(message: Message) -> typing.Optional[User]:
- """
- Get user who sent message, searching if not found easily
- :param message: Message to get user from
- :return: User who sent message
- """
- try:
- return await message.client.get_entity(message.sender_id)
- except ValueError: # Not in database. Lets go looking for them.
- logger.debug("User not in session cache. Searching...")
- if isinstance(message.peer_id, PeerUser):
- await message.client.get_dialogs()
- return await message.client.get_entity(message.sender_id)
- if isinstance(message.peer_id, (PeerChannel, PeerChat)):
- try:
- return await message.client.get_entity(message.sender_id)
- except Exception:
- pass
- async for user in message.client.iter_participants(
- message.peer_id,
- aggressive=True,
- ):
- if user.id == message.sender_id:
- return user
- logger.error("User isn't in the group where they sent the message")
- return None
- logger.error("`peer_id` is not a user, chat or channel")
- return None
- def run_sync(func, *args, **kwargs):
- """
- Run a non-async function in a new thread and return an awaitable
- :param func: Sync-only function to execute
- :return: Awaitable coroutine
- """
- return asyncio.get_event_loop().run_in_executor(
- None,
- functools.partial(func, *args, **kwargs),
- )
- def run_async(loop: asyncio.AbstractEventLoop, coro: typing.Awaitable) -> typing.Any:
- """
- Run an async function as a non-async function, blocking till it's done
- :param loop: Event loop to run the coroutine in
- :param coro: Coroutine to run
- :return: Result of the coroutine
- """
- return asyncio.run_coroutine_threadsafe(coro, loop).result()
- def censor(
- obj: typing.Any,
- to_censor: typing.Optional[typing.Iterable[str]] = None,
- replace_with: str = "redacted_{count}_chars",
- ):
- """
- May modify the original object, but don't rely on it
- :param obj: Object to censor, preferrably telethon
- :param to_censor: Iterable of strings to censor
- :param replace_with: String to replace with, {count} will be replaced with the number of characters
- :return: Censored object
- """
- if to_censor is None:
- to_censor = ["phone"]
- for k, v in vars(obj).items():
- if k in to_censor:
- setattr(obj, k, replace_with.format(count=len(v)))
- elif k[0] != "_" and hasattr(v, "__dict__"):
- setattr(obj, k, censor(v, to_censor, replace_with))
- return obj
- def relocate_entities(
- entities: typing.List[FormattingEntity],
- offset: int,
- text: typing.Optional[str] = None,
- ) -> typing.List[FormattingEntity]:
- """
- Move all entities by offset (truncating at text)
- :param entities: List of entities
- :param offset: Offset to move by
- :param text: Text to truncate at
- :return: List of entities
- """
- length = len(text) if text is not None else 0
- for ent in entities.copy() if entities else ():
- ent.offset += offset
- if ent.offset < 0:
- ent.length += ent.offset
- ent.offset = 0
- if text is not None and ent.offset + ent.length > length:
- ent.length = length - ent.offset
- if ent.length <= 0:
- entities.remove(ent)
- return entities
- async def answer_file(
- message: typing.Union[Message, InlineCall, InlineMessage],
- file: typing.Union[str, bytes, io.IOBase, InputDocument],
- caption: typing.Optional[str] = None,
- **kwargs,
- ):
- """
- Use this to answer a message with a document
- :param message: Message to answer
- :param file: File to send - url, path or bytes
- :param caption: Caption to send
- :param kwargs: Extra kwargs to pass to `send_file`
- :return: Sent message
- :example:
- >>> await utils.answer_file(message, "test.txt")
- >>> await utils.answer_file(
- message,
- "https://mods.hikariatama.ru/badges/artai.jpg",
- "This is the cool module, check it out!",
- )
- """
- if isinstance(message, (InlineCall, InlineMessage)):
- message = message.form["caller"]
- if topic := get_topic(message):
- kwargs.setdefault("reply_to", topic)
- try:
- response = await message.client.send_file(
- message.peer_id,
- file,
- caption=caption,
- **kwargs,
- )
- except Exception:
- if caption:
- logger.warning(
- "Failed to send file, sending plain text instead", exc_info=True
- )
- return await answer(message, caption, **kwargs)
- raise
- with contextlib.suppress(Exception):
- await message.delete()
- return response
- async def answer(
- message: typing.Union[Message, InlineCall, InlineMessage],
- response: str,
- *,
- reply_markup: typing.Optional[HikkaReplyMarkup] = None,
- **kwargs,
- ) -> typing.Union[InlineCall, InlineMessage, Message]:
- """
- Use this to give the response to a command
- :param message: Message to answer to. Can be a tl message or hikka inline object
- :param response: Response to send
- :param reply_markup: Reply markup to send. If specified, inline form will be used
- :return: Message or inline object
- :example:
- >>> await utils.answer(message, "Hello world!")
- >>> await utils.answer(
- message,
- "https://some-url.com/photo.jpg",
- caption="Hello, this is your photo!",
- asfile=True,
- )
- >>> await utils.answer(
- message,
- "Hello world!",
- reply_markup={"text": "Hello!", "data": "world"},
- silent=True,
- disable_security=True,
- )
- """
- # Compatibility with FTG\GeekTG
- if isinstance(message, list) and message:
- message = message[0]
- if reply_markup is not None:
- if not isinstance(reply_markup, (list, dict)):
- raise ValueError("reply_markup must be a list or dict")
- if reply_markup:
- kwargs.pop("message", None)
- if isinstance(message, (InlineMessage, InlineCall)):
- await message.edit(response, reply_markup, **kwargs)
- return
- reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
- result = await message.client.loader.inline.form(
- response,
- message=message if message.out else get_chat_id(message),
- reply_markup=reply_markup,
- **kwargs,
- )
- return result
- if isinstance(message, (InlineMessage, InlineCall)):
- await message.edit(response)
- return message
- kwargs.setdefault("link_preview", False)
- if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
- kwargs.setdefault(
- "reply_to",
- getattr(message, "reply_to_msg_id", None),
- )
- parse_mode = telethon.utils.sanitize_parse_mode(
- kwargs.pop(
- "parse_mode",
- message.client.parse_mode,
- )
- )
- if isinstance(response, str) and not kwargs.pop("asfile", False):
- text, entities = parse_mode.parse(response)
- if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
- try:
- if not message.client.loader.inline.init_complete:
- raise
- strings = list(smart_split(text, entities, 4096))
- if len(strings) > 10:
- raise
- list_ = await message.client.loader.inline.list(
- message=message,
- strings=strings,
- )
- if not list_:
- raise
- return list_
- except Exception:
- file = io.BytesIO(text.encode("utf-8"))
- file.name = "command_result.txt"
- result = await message.client.send_file(
- message.peer_id,
- file,
- caption=message.client.loader.lookup("translations").strings(
- "too_long"
- ),
- reply_to=kwargs.get("reply_to") or get_topic(message),
- )
- if message.out:
- await message.delete()
- return result
- result = await (message.edit if edit else message.respond)(
- text,
- parse_mode=lambda t: (t, entities),
- **kwargs,
- )
- elif isinstance(response, Message):
- if message.media is None and (
- response.media is None or isinstance(response.media, MessageMediaWebPage)
- ):
- result = await message.edit(
- response.message,
- parse_mode=lambda t: (t, response.entities or []),
- link_preview=isinstance(response.media, MessageMediaWebPage),
- )
- else:
- result = await message.respond(response, **kwargs)
- else:
- if isinstance(response, bytes):
- response = io.BytesIO(response)
- elif isinstance(response, str):
- response = io.BytesIO(response.encode("utf-8"))
- if name := kwargs.pop("filename", None):
- response.name = name
- if message.media is not None and edit:
- await message.edit(file=response, **kwargs)
- else:
- kwargs.setdefault(
- "reply_to",
- getattr(message, "reply_to_msg_id", get_topic(message)),
- )
- result = await message.client.send_file(message.peer_id, response, **kwargs)
- if message.out:
- await message.delete()
- return result
- async def get_target(message: Message, arg_no: int = 0) -> typing.Optional[int]:
- """
- Get target from message
- :param message: Message to get target from
- :param arg_no: Argument number to get target from
- :return: Target
- """
- if any(
- isinstance(entity, MessageEntityMentionName)
- for entity in (message.entities or [])
- ):
- e = sorted(
- filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
- key=lambda x: x.offset,
- )[0]
- return e.user_id
- if len(get_args(message)) > arg_no:
- user = get_args(message)[arg_no]
- elif message.is_reply:
- return (await message.get_reply_message()).sender_id
- elif hasattr(message.peer_id, "user_id"):
- user = message.peer_id.user_id
- else:
- return None
- try:
- entity = await message.client.get_entity(user)
- except ValueError:
- return None
- else:
- if isinstance(entity, User):
- return entity.id
- def merge(a: dict, b: dict, /) -> dict:
- """
- Merge with replace dictionary a to dictionary b
- :param a: Dictionary to merge
- :param b: Dictionary to merge to
- :return: Merged dictionary
- """
- for key in a:
- if key in b:
- if isinstance(a[key], dict) and isinstance(b[key], dict):
- b[key] = merge(a[key], b[key])
- elif isinstance(a[key], list) and isinstance(b[key], list):
- b[key] = list(set(b[key] + a[key]))
- else:
- b[key] = a[key]
- b[key] = a[key]
- return b
- async def set_avatar(
- client: CustomTelegramClient,
- peer: hints.Entity,
- avatar: str,
- ) -> bool:
- """
- Sets an entity avatar
- :param client: Client to use
- :param peer: Peer to set avatar to
- :param avatar: Avatar to set
- :return: True if avatar was set, False otherwise
- """
- if isinstance(avatar, str) and check_url(avatar):
- f = (
- await run_sync(
- requests.get,
- avatar,
- )
- ).content
- elif isinstance(avatar, bytes):
- f = avatar
- else:
- return False
- res = await client(
- EditPhotoRequest(
- channel=peer,
- photo=await client.upload_file(f, file_name="photo.png"),
- )
- )
- try:
- await client.delete_messages(
- peer,
- message_ids=[
- next(
- update
- for update in res.updates
- if isinstance(update, UpdateNewChannelMessage)
- ).message.id
- ],
- )
- except Exception:
- pass
- return True
- async def invite_inline_bot(
- client: CustomTelegramClient,
- peer: hints.EntityLike,
- ) -> None:
- """
- Invites inline bot to a chat
- :param client: Client to use
- :param peer: Peer to invite bot to
- :return: None
- :raise RuntimeError: If error occurred while inviting bot
- """
- try:
- await client(InviteToChannelRequest(peer, [client.loader.inline.bot_username]))
- except Exception as e:
- raise RuntimeError(
- "Can't invite inline bot to old asset chat, which is required by module"
- ) from e
- with contextlib.suppress(Exception):
- await client(
- EditAdminRequest(
- channel=peer,
- user_id=client.loader.inline.bot_username,
- admin_rights=ChatAdminRights(ban_users=True),
- rank="Hikka",
- )
- )
- async def asset_channel(
- client: CustomTelegramClient,
- title: str,
- description: str,
- *,
- channel: bool = False,
- silent: bool = False,
- archive: bool = False,
- invite_bot: bool = False,
- avatar: typing.Optional[str] = None,
- ttl: typing.Optional[int] = None,
- _folder: typing.Optional[str] = None,
- ) -> typing.Tuple[Channel, bool]:
- """
- Create new channel (if needed) and return its entity
- :param client: Telegram client to create channel by
- :param title: Channel title
- :param description: Description
- :param channel: Whether to create a channel or supergroup
- :param silent: Automatically mute channel
- :param archive: Automatically archive channel
- :param invite_bot: Add inline bot and assure it's in chat
- :param avatar: Url to an avatar to set as pfp of created peer
- :param ttl: Time to live for messages in channel
- :return: Peer and bool: is channel new or pre-existent
- """
- if not hasattr(client, "_channels_cache"):
- client._channels_cache = {}
- if (
- title in client._channels_cache
- and client._channels_cache[title]["exp"] > time.time()
- ):
- return client._channels_cache[title]["peer"], False
- async for d in client.iter_dialogs():
- if d.title == title:
- client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
- if invite_bot:
- if all(
- participant.id != client.loader.inline.bot_id
- for participant in (
- await client.get_participants(d.entity, limit=100)
- )
- ):
- await invite_inline_bot(client, d.entity)
- return d.entity, False
- peer = (
- await client(
- CreateChannelRequest(
- title,
- description,
- megagroup=not channel,
- )
- )
- ).chats[0]
- if invite_bot:
- await invite_inline_bot(client, peer)
- if silent:
- await dnd(client, peer, archive)
- elif archive:
- await client.edit_folder(peer, 1)
- if avatar:
- await set_avatar(client, peer, avatar)
- if ttl:
- await client(SetHistoryTTLRequest(peer=peer, period=ttl))
- if _folder:
- if _folder != "hikka":
- raise NotImplementedError
- folders = await client(GetDialogFiltersRequest())
- try:
- folder = next(folder for folder in folders if folder.title == "hikka")
- except Exception:
- folder = None
- if folder is not None and not any(
- peer.id == getattr(folder_peer, "channel_id", None)
- for folder_peer in folder.include_peers
- ):
- folder.include_peers += [await client.get_input_entity(peer)]
- await client(
- UpdateDialogFilterRequest(
- folder.id,
- folder,
- )
- )
- client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
- return peer, True
- async def dnd(
- client: CustomTelegramClient,
- peer: hints.Entity,
- archive: bool = True,
- ) -> bool:
- """
- Mutes and optionally archives peer
- :param peer: Anything entity-link
- :param archive: Archive peer, or just mute?
- :return: `True` on success, otherwise `False`
- """
- try:
- await client(
- UpdateNotifySettingsRequest(
- peer=peer,
- settings=InputPeerNotifySettings(
- show_previews=False,
- silent=True,
- mute_until=2**31 - 1,
- ),
- )
- )
- if archive:
- await client.edit_folder(peer, 1)
- except Exception:
- logger.exception("utils.dnd error")
- return False
- return True
- def get_link(user: typing.Union[User, Channel], /) -> str:
- """
- Get telegram permalink to entity
- :param user: User or channel
- :return: Link to entity
- """
- return (
- f"tg://user?id={user.id}"
- if isinstance(user, User)
- else (
- f"tg://resolve?domain={user.username}"
- if getattr(user, "username", None)
- else ""
- )
- )
- def chunks(_list: ListLike, n: int, /) -> typing.List[typing.List[typing.Any]]:
- """
- Split provided `_list` into chunks of `n`
- :param _list: List to split
- :param n: Chunk size
- :return: List of chunks
- """
- return [_list[i : i + n] for i in range(0, len(_list), n)]
- def get_named_platform() -> str:
- """
- Returns formatted platform name
- :return: Platform name
- """
- try:
- if os.path.isfile("/proc/device-tree/model"):
- with open("/proc/device-tree/model") as f:
- model = f.read()
- if "Orange" in model:
- return f"🍊 {model}"
- return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
- except Exception:
- # In case of weird fs, aka Termux
- pass
- try:
- from platform import uname
- if "microsoft-standard" in uname().release:
- return "🍁 WSL"
- except Exception:
- pass
- if "MIYAHOST" in os.environ:
- return "🎃 MiyaHost"
- if "GOORM" in os.environ:
- return "🦾 GoormIDE"
- if "RAILWAY" in os.environ:
- return "🚂 Railway"
- if "DOCKER" in os.environ:
- return "🐳 Docker"
- if "com.termux" in os.environ.get("PREFIX", ""):
- return "🕶 Termux"
- if "CODESPACES" in os.environ:
- return "🐈⬛ Codespaces"
- return f"✌️ lavHost {os.environ['LAVHOST']}" if "LAVHOST" in os.environ else "📻 VDS"
- def get_platform_emoji() -> str:
- """
- Returns custom emoji for current platform
- :return: Emoji entity in string
- """
- BASE = "".join(
- (
- "<emoji document_id={}>🌘</emoji>",
- "<emoji document_id=5195311729663286630>🌘</emoji>",
- "<emoji document_id=5195045669324201904>🌘</emoji>",
- )
- )
- if "DOCKER" in os.environ:
- return BASE.format(5298554256603752468)
- if "LAVHOST" in os.environ:
- return BASE.format(5301078610747074753)
- if "MIYAHOST" in os.environ:
- return BASE.format(5280938237785808014)
- if "GOORM" in os.environ:
- return BASE.format(5298947740032573902)
- if "CODESPACES" in os.environ:
- return BASE.format(5194976881127989720)
- if "com.termux" in os.environ.get("PREFIX", ""):
- return BASE.format(5193051778001673828)
- if "RAILWAY" in os.environ:
- return BASE.format(5199607521593007466)
- return BASE.format(5192765204898783881)
- def uptime() -> int:
- """
- Returns userbot uptime in seconds
- :return: Uptime in seconds
- """
- return round(time.perf_counter() - init_ts)
- def formatted_uptime() -> str:
- """
- Returnes formmated uptime
- :return: Formatted uptime
- """
- return f"{str(timedelta(seconds=uptime()))}"
- def ascii_face() -> str:
- """
- Returnes cute ASCII-art face
- :return: ASCII-art face
- """
- return escape_html(
- random.choice(
- [
- "ヽ(๑◠ܫ◠๑)ノ",
- "(◕ᴥ◕ʋ)",
- "ᕙ(`▽´)ᕗ",
- "(✿◠‿◠)",
- "(▰˘◡˘▰)",
- "(˵ ͡° ͜ʖ ͡°˵)",
- "ʕっ•ᴥ•ʔっ",
- "( ͡° ᴥ ͡°)",
- "(๑•́ ヮ •̀๑)",
- "٩(^‿^)۶",
- "(っˆڡˆς)",
- "ψ(`∇´)ψ",
- "⊙ω⊙",
- "٩(^ᴗ^)۶",
- "(´・ω・)っ由",
- "( ͡~ ͜ʖ ͡°)",
- "✧♡(◕‿◕✿)",
- "โ๏௰๏ใ ื",
- "∩。• ᵕ •。∩ ♡",
- "(♡´౪`♡)",
- "(◍>◡<◍)⋈。✧♡",
- "╰(✿´⌣`✿)╯♡",
- "ʕ•ᴥ•ʔ",
- "ᶘ ◕ᴥ◕ᶅ",
- "▼・ᴥ・▼",
- "ฅ^•ﻌ•^ฅ",
- "(΄◞ิ౪◟ิ‵)",
- "٩(^ᴗ^)۶",
- "ᕴーᴥーᕵ",
- "ʕ→ᴥ←ʔ",
- "ʕᵕᴥᵕʔ",
- "ʕᵒᴥᵒʔ",
- "ᵔᴥᵔ",
- "(✿╹◡╹)",
- "(๑→ܫ←)",
- "ʕ·ᴥ· ʔ",
- "(ノ≧ڡ≦)",
- "(≖ᴗ≖✿)",
- "(〜^∇^ )〜",
- "( ノ・ェ・ )ノ",
- "~( ˘▾˘~)",
- "(〜^∇^)〜",
- "ヽ(^ᴗ^ヽ)",
- "(´・ω・`)",
- "₍ᐢ•ﻌ•ᐢ₎*・゚。",
- "(。・・)_且",
- "(=`ω´=)",
- "(*•‿•*)",
- "(*゚∀゚*)",
- "(☉⋆‿⋆☉)",
- "ɷ◡ɷ",
- "ʘ‿ʘ",
- "(。-ω-)ノ",
- "( ・ω・)ノ",
- "(=゚ω゚)ノ",
- "(・ε・`*) …",
- "ʕっ•ᴥ•ʔっ",
- "(*˘︶˘*)",
- ]
- )
- )
- def array_sum(
- array: typing.List[typing.List[typing.Any]], /
- ) -> typing.List[typing.Any]:
- """
- Performs basic sum operation on array
- :param array: Array to sum
- :return: Sum of array
- """
- result = []
- for item in array:
- result += item
- return result
- def rand(size: int, /) -> str:
- """
- Return random string of len `size`
- :param size: Length of string
- :return: Random string
- """
- return "".join(
- [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
- )
- def smart_split(
- text: str,
- entities: typing.List[FormattingEntity],
- length: int = 4096,
- split_on: ListLike = ("\n", " "),
- min_length: int = 1,
- ) -> typing.Iterator[str]:
- """
- Split the message into smaller messages.
- A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
- The end of each message except the last one is stripped of characters from [split_on]
- :param text: the plain text input
- :param entities: the entities
- :param length: the maximum length of a single message
- :param split_on: characters (or strings) which are preferred for a message break
- :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
- :return: iterator, which returns strings
- :example:
- >>> utils.smart_split(
- *telethon.extensions.html.parse(
- "<b>Hello, world!</b>"
- )
- )
- <<< ["<b>Hello, world!</b>"]
- """
- # Authored by @bsolute
- # https://t.me/LonamiWebs/27777
- encoded = text.encode("utf-16le")
- pending_entities = entities
- text_offset = 0
- bytes_offset = 0
- text_length = len(text)
- bytes_length = len(encoded)
- while text_offset < text_length:
- if bytes_offset + length * 2 >= bytes_length:
- yield parser.unparse(
- text[text_offset:],
- list(sorted(pending_entities, key=lambda x: x.offset)),
- )
- break
- codepoint_count = len(
- encoded[bytes_offset : bytes_offset + length * 2].decode(
- "utf-16le",
- errors="ignore",
- )
- )
- for search in split_on:
- search_index = text.rfind(
- search,
- text_offset + min_length,
- text_offset + codepoint_count,
- )
- if search_index != -1:
- break
- else:
- search_index = text_offset + codepoint_count
- split_index = grapheme.safe_split_index(text, search_index)
- split_offset_utf16 = (
- len(text[text_offset:split_index].encode("utf-16le"))
- ) // 2
- exclude = 0
- while (
- split_index + exclude < text_length
- and text[split_index + exclude] in split_on
- ):
- exclude += 1
- current_entities = []
- entities = pending_entities.copy()
- pending_entities = []
- for entity in entities:
- if (
- entity.offset < split_offset_utf16
- and entity.offset + entity.length > split_offset_utf16 + exclude
- ):
- # spans boundary
- current_entities.append(
- _copy_tl(
- entity,
- length=split_offset_utf16 - entity.offset,
- )
- )
- pending_entities.append(
- _copy_tl(
- entity,
- offset=0,
- length=entity.offset
- + entity.length
- - split_offset_utf16
- - exclude,
- )
- )
- elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
- # overlaps boundary
- current_entities.append(
- _copy_tl(
- entity,
- length=split_offset_utf16 - entity.offset,
- )
- )
- elif entity.offset < split_offset_utf16:
- # wholly left
- current_entities.append(entity)
- elif (
- entity.offset + entity.length
- > split_offset_utf16 + exclude
- > entity.offset
- ):
- # overlaps right boundary
- pending_entities.append(
- _copy_tl(
- entity,
- offset=0,
- length=entity.offset
- + entity.length
- - split_offset_utf16
- - exclude,
- )
- )
- elif entity.offset + entity.length > split_offset_utf16 + exclude:
- # wholly right
- pending_entities.append(
- _copy_tl(
- entity,
- offset=entity.offset - split_offset_utf16 - exclude,
- )
- )
- current_text = text[text_offset:split_index]
- yield parser.unparse(
- current_text,
- list(sorted(current_entities, key=lambda x: x.offset)),
- )
- text_offset = split_index + exclude
- bytes_offset += len(current_text.encode("utf-16le"))
- def _copy_tl(o, **kwargs):
- d = o.to_dict()
- del d["_"]
- d.update(kwargs)
- return o.__class__(**d)
- def check_url(url: str) -> bool:
- """
- Statically checks url for validity
- :param url: URL to check
- :return: True if valid, False otherwise
- """
- try:
- return bool(urlparse(url).netloc)
- except Exception:
- return False
- def get_git_hash() -> typing.Union[str, bool]:
- """
- Get current Hikka git hash
- :return: Git commit hash
- """
- try:
- return git.Repo().head.commit.hexsha
- except Exception:
- return False
- def get_commit_url() -> str:
- """
- Get current Hikka git commit url
- :return: Git commit url
- """
- try:
- hash_ = get_git_hash()
- return (
- f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
- )
- except Exception:
- return "Unknown"
- def is_serializable(x: typing.Any, /) -> bool:
- """
- Checks if object is JSON-serializable
- :param x: Object to check
- :return: True if object is JSON-serializable, False otherwise
- """
- try:
- json.dumps(x)
- return True
- except Exception:
- return False
- def get_lang_flag(countrycode: str) -> str:
- """
- Gets an emoji of specified countrycode
- :param countrycode: 2-letter countrycode
- :return: Emoji flag
- """
- if (
- len(
- code := [
- c
- for c in countrycode.lower()
- if c in string.ascii_letters + string.digits
- ]
- )
- == 2
- ):
- return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
- return countrycode
- def get_entity_url(
- entity: typing.Union[User, Channel],
- openmessage: bool = False,
- ) -> str:
- """
- Get link to object, if available
- :param entity: Entity to get url of
- :param openmessage: Use tg://openmessage link for users
- :return: Link to object or empty string
- """
- return (
- (
- f"tg://openmessage?id={entity.id}"
- if openmessage
- else f"tg://user?id={entity.id}"
- )
- if isinstance(entity, User)
- else (
- f"tg://resolve?domain={entity.username}"
- if getattr(entity, "username", None)
- else ""
- )
- )
- async def get_message_link(
- message: Message,
- chat: typing.Optional[typing.Union[Chat, Channel]] = None,
- ) -> str:
- """
- Get link to message
- :param message: Message to get link of
- :param chat: Chat, where message was sent
- :return: Link to message
- """
- if message.is_private:
- return (
- f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
- )
- if not chat and not (chat := message.chat):
- chat = await message.get_chat()
- topic_affix = (
- f"?topic={message.reply_to.reply_to_msg_id}"
- if getattr(message.reply_to, "forum_topic", False)
- else ""
- )
- return (
- f"https://t.me/{chat.username}/{message.id}{topic_affix}"
- if getattr(chat, "username", False)
- else f"https://t.me/c/{chat.id}/{message.id}{topic_affix}"
- )
- def remove_html(text: str, escape: bool = False, keep_emojis: bool = False) -> str:
- """
- Removes HTML tags from text
- :param text: Text to remove HTML from
- :param escape: Escape HTML
- :param keep_emojis: Keep custom emojis
- :return: Text without HTML
- """
- return (escape_html if escape else str)(
- re.sub(
- r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>)"
- if keep_emojis
- else r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)",
- "",
- text,
- )
- )
- def get_kwargs() -> typing.Dict[str, typing.Any]:
- """
- Get kwargs of function, in which is called
- :return: kwargs
- """
- # https://stackoverflow.com/a/65927265/19170642
- keys, _, _, values = inspect.getargvalues(inspect.currentframe().f_back)
- return {key: values[key] for key in keys if key != "self"}
- def mime_type(message: Message) -> str:
- """
- Get mime type of document in message
- :param message: Message with document
- :return: Mime type or empty string if not present
- """
- return (
- ""
- if not isinstance(message, Message) or not getattr(message, "media", False)
- else getattr(getattr(message, "media", False), "mime_type", False) or ""
- )
- def find_caller(
- stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
- ) -> typing.Any:
- """
- Attempts to find command in stack
- :param stack: Stack to search in
- :return: Command-caller or None
- """
- caller = next(
- (
- frame_info
- for frame_info in stack or inspect.stack()
- if hasattr(frame_info, "function")
- and any(
- inspect.isclass(cls_)
- and issubclass(cls_, Module)
- and cls_ is not Module
- for cls_ in frame_info.frame.f_globals.values()
- )
- ),
- None,
- )
- if not caller:
- return next(
- (
- frame_info.frame.f_locals["func"]
- for frame_info in stack or inspect.stack()
- if hasattr(frame_info, "function")
- and frame_info.function == "future_dispatcher"
- and (
- "CommandDispatcher"
- in getattr(getattr(frame_info, "frame", None), "f_globals", {})
- )
- ),
- None,
- )
- return next(
- (
- getattr(cls_, caller.function, None)
- for cls_ in caller.frame.f_globals.values()
- if inspect.isclass(cls_) and issubclass(cls_, Module)
- ),
- None,
- )
- def validate_html(html: str) -> str:
- """
- Removes broken tags from html
- :param html: HTML to validate
- :return: Valid HTML
- """
- text, entities = telethon.extensions.html.parse(html)
- return telethon.extensions.html.unparse(escape_html(text), entities)
- def iter_attrs(obj: typing.Any, /) -> typing.List[typing.Tuple[str, typing.Any]]:
- """
- Returns list of attributes of object
- :param obj: Object to iterate over
- :return: List of attributes and their values
- """
- return ((attr, getattr(obj, attr)) for attr in dir(obj))
- def atexit(
- func: typing.Callable,
- use_signal: typing.Optional[int] = None,
- *args,
- **kwargs,
- ) -> None:
- """
- Calls function on exit
- :param func: Function to call
- :param use_signal: If passed, `signal` will be used instead of `atexit`
- :param args: Arguments to pass to function
- :param kwargs: Keyword arguments to pass to function
- :return: None
- """
- if use_signal:
- signal.signal(use_signal, lambda *_: func(*args, **kwargs))
- return
- _atexit.register(functools.partial(func, *args, **kwargs))
- def get_topic(message: Message) -> typing.Optional[int]:
- """
- Get topic id of message
- :param message: Message to get topic of
- :return: int or None if not present
- """
- return (
- (message.reply_to.reply_to_top_id or message.reply_to.reply_to_msg_id)
- if (
- isinstance(message, Message)
- and message.reply_to
- and message.reply_to.forum_topic
- )
- else message.form["top_msg_id"]
- if isinstance(message, (InlineCall, InlineMessage))
- else None
- )
- def get_ram_usage() -> float:
- """Returns current process tree memory usage in MB"""
- try:
- import psutil
- current_process = psutil.Process(os.getpid())
- mem = current_process.memory_info()[0] / 2.0**20
- for child in current_process.children(recursive=True):
- mem += child.memory_info()[0] / 2.0**20
- return round(mem, 1)
- except Exception:
- return 0
- def get_cpu_usage() -> float:
- """Returns current process tree CPU usage in %"""
- try:
- import psutil
- current_process = psutil.Process(os.getpid())
- cpu = current_process.cpu_percent()
- for child in current_process.children(recursive=True):
- cpu += child.cpu_percent()
- return round(cpu, 1)
- except Exception:
- return 0
- init_ts = time.perf_counter()
- # GeekTG Compatibility
- def get_git_info() -> typing.Tuple[str, str]:
- """
- Get git info
- :return: Git info
- """
- hash_ = get_git_hash()
- return (
- hash_,
- f"https://github.com/hikariatama/Hikka/commit/{hash_}" if hash_ else "",
- )
- def get_version_raw() -> str:
- """
- Get the version of the userbot
- :return: Version in format %s.%s.%s
- """
- from . import version
- return ".".join(map(str, list(version.__version__)))
- get_platform_name = get_named_platform
|