1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162 |
- """Utilities"""
- # Friendly Telegram (telegram userbot)
- # Copyright (C) 2018-2021 The Authors
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU Affero General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU Affero General Public License for more details.
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
- # █▀█ █ █ █ █▀█ █▀▄ █
- # © Copyright 2022
- # https://t.me/hikariatama
- #
- # 🔒 Licensed under the GNU AGPLv3
- # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import functools
- import io
- import json
- import logging
- import os
- import random
- import re
- import shlex
- import string
- import time
- import inspect
- from datetime import timedelta
- from typing import Any, List, Optional, Tuple, Union
- from urllib.parse import urlparse
- import git
- import grapheme
- import requests
- import telethon
- from telethon.hints import Entity
- from telethon.tl.custom.message import Message
- from telethon.tl.functions.account import UpdateNotifySettingsRequest
- from telethon.tl.functions.channels import CreateChannelRequest, EditPhotoRequest
- from telethon.tl.functions.messages import (
- GetDialogFiltersRequest,
- UpdateDialogFilterRequest,
- )
- from telethon.tl.types import (
- Channel,
- InputPeerNotifySettings,
- MessageEntityBankCard,
- MessageEntityBlockquote,
- MessageEntityBold,
- MessageEntityBotCommand,
- MessageEntityCashtag,
- MessageEntityCode,
- MessageEntityEmail,
- MessageEntityHashtag,
- MessageEntityItalic,
- MessageEntityMention,
- MessageEntityMentionName,
- MessageEntityPhone,
- MessageEntityPre,
- MessageEntitySpoiler,
- MessageEntityStrike,
- MessageEntityTextUrl,
- MessageEntityUnderline,
- MessageEntityUnknown,
- MessageEntityUrl,
- MessageMediaWebPage,
- PeerChannel,
- PeerChat,
- PeerUser,
- User,
- Chat,
- UpdateNewChannelMessage,
- )
- from aiogram.types import Message as AiogramMessage
- from .inline.types import InlineCall, InlineMessage
- from .types import Module
- FormattingEntity = Union[
- MessageEntityUnknown,
- MessageEntityMention,
- MessageEntityHashtag,
- MessageEntityBotCommand,
- MessageEntityUrl,
- MessageEntityEmail,
- MessageEntityBold,
- MessageEntityItalic,
- MessageEntityCode,
- MessageEntityPre,
- MessageEntityTextUrl,
- MessageEntityMentionName,
- MessageEntityPhone,
- MessageEntityCashtag,
- MessageEntityUnderline,
- MessageEntityStrike,
- MessageEntityBlockquote,
- MessageEntityBankCard,
- MessageEntitySpoiler,
- ]
- ListLike = Union[list, set, tuple]
- emoji_pattern = re.compile(
- "["
- "\U0001F600-\U0001F64F" # emoticons
- "\U0001F300-\U0001F5FF" # symbols & pictographs
- "\U0001F680-\U0001F6FF" # transport & map symbols
- "\U0001F1E0-\U0001F1FF" # flags (iOS)
- "]+",
- flags=re.UNICODE,
- )
- parser = telethon.utils.sanitize_parse_mode("html")
- def get_args(message: Message) -> List[str]:
- """Get arguments from message (str or Message), return list of arguments"""
- if not (message := getattr(message, "message", message)):
- return False
- if len(message := message.split(maxsplit=1)) <= 1:
- return []
- message = message[1]
- try:
- split = shlex.split(message)
- except ValueError:
- return message # Cannot split, let's assume that it's just one long message
- return list(filter(lambda x: len(x) > 0, split))
- def get_args_raw(message: Message) -> str:
- """Get the parameters to the command as a raw string (not split)"""
- if not (message := getattr(message, "message", message)):
- return False
- return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
- def get_args_split_by(message: Message, separator: str) -> List[str]:
- """Split args with a specific separator"""
- return [
- section.strip() for section in get_args_raw(message).split(separator) if section
- ]
- def get_chat_id(message: Union[Message, AiogramMessage]) -> int:
- """Get the chat ID, but without -100 if its a channel"""
- return telethon.utils.resolve_id(
- getattr(message, "chat_id", None)
- or getattr(getattr(message, "chat", None), "id", None)
- )[0]
- def get_entity_id(entity: Entity) -> int:
- """Get entity ID"""
- return telethon.utils.get_peer_id(entity)
- def escape_html(text: str, /) -> str: # sourcery skip
- """Pass all untrusted/potentially corrupt input here"""
- return str(text).replace("&", "&").replace("<", "<").replace(">", ">")
- def escape_quotes(text: str, /) -> str:
- """Escape quotes to html quotes"""
- return escape_html(text).replace('"', """)
- def get_base_dir() -> str:
- """Get directory of this file"""
- from . import __main__
- return get_dir(__main__.__file__)
- def get_dir(mod: str) -> str:
- """Get directory of given module"""
- return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
- async def get_user(message: Message) -> Union[None, User]:
- """Get user who sent message, searching if not found easily"""
- try:
- return await message.client.get_entity(message.sender_id)
- except ValueError: # Not in database. Lets go looking for them.
- logging.debug("User not in session cache. Searching...")
- if isinstance(message.peer_id, PeerUser):
- await message.client.get_dialogs()
- return await message.client.get_entity(message.sender_id)
- if isinstance(message.peer_id, (PeerChannel, PeerChat)):
- try:
- return await message.client.get_entity(message.sender_id)
- except Exception:
- pass
- async for user in message.client.iter_participants(
- message.peer_id,
- aggressive=True,
- ):
- if user.id == message.sender_id:
- return user
- logging.error("User isn't in the group where they sent the message")
- return None
- logging.error("`peer_id` is not a user, chat or channel")
- return None
- def run_sync(func, *args, **kwargs):
- """
- Run a non-async function in a new thread and return an awaitable
- :param func: Sync-only function to execute
- :returns: Awaitable coroutine
- """
- return asyncio.get_event_loop().run_in_executor(
- None,
- functools.partial(func, *args, **kwargs),
- )
- def run_async(loop, coro):
- """Run an async function as a non-async function, blocking till it's done"""
- # When we bump minimum support to 3.7, use run()
- return asyncio.run_coroutine_threadsafe(coro, loop).result()
- def censor(
- obj,
- to_censor: Optional[List[str]] = None,
- replace_with: Optional[str] = "redacted_{count}_chars",
- ):
- """May modify the original object, but don't rely on it"""
- if to_censor is None:
- to_censor = ["phone"]
- for k, v in vars(obj).items():
- if k in to_censor:
- setattr(obj, k, replace_with.format(count=len(v)))
- elif k[0] != "_" and hasattr(v, "__dict__"):
- setattr(obj, k, censor(v, to_censor, replace_with))
- return obj
- def relocate_entities(
- entities: list,
- offset: int,
- text: Optional[str] = None,
- ) -> list:
- """Move all entities by offset (truncating at text)"""
- length = len(text) if text is not None else 0
- for ent in entities.copy() if entities else ():
- ent.offset += offset
- if ent.offset < 0:
- ent.length += ent.offset
- ent.offset = 0
- if text is not None and ent.offset + ent.length > length:
- ent.length = length - ent.offset
- if ent.length <= 0:
- entities.remove(ent)
- return entities
- async def answer(
- message: Union[Message, InlineCall, InlineMessage],
- response: str,
- *,
- reply_markup: Optional[Union[List[List[dict]], List[dict], dict]] = None,
- **kwargs,
- ) -> Union[InlineCall, InlineMessage, Message]:
- """Use this to give the response to a command"""
- # Compatibility with FTG\GeekTG
- if isinstance(message, list) and message:
- message = message[0]
- if reply_markup is not None:
- if not isinstance(reply_markup, (list, dict)):
- raise ValueError("reply_markup must be a list or dict")
- if reply_markup:
- if isinstance(message, (InlineMessage, InlineCall)):
- await message.edit(response, reply_markup)
- return
- reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
- result = await message.client.loader.inline.form(
- response,
- message=message if message.out else get_chat_id(message),
- reply_markup=reply_markup,
- **kwargs,
- )
- return result
- if isinstance(message, (InlineMessage, InlineCall)):
- await message.edit(response)
- return message
- kwargs.setdefault("link_preview", False)
- if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
- kwargs.setdefault(
- "reply_to",
- getattr(message, "reply_to_msg_id", None),
- )
- parse_mode = telethon.utils.sanitize_parse_mode(
- kwargs.pop(
- "parse_mode",
- message.client.parse_mode,
- )
- )
- if isinstance(response, str) and not kwargs.pop("asfile", False):
- text, entities = parse_mode.parse(response)
- if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
- try:
- if not message.client.loader.inline.init_complete:
- raise
- strings = list(smart_split(text, entities, 4096))
- if len(strings) > 10:
- raise
- list_ = await message.client.loader.inline.list(
- message=message,
- strings=strings,
- )
- if not list_:
- raise
- return list_
- except Exception:
- file = io.BytesIO(text.encode("utf-8"))
- file.name = "command_result.txt"
- result = await message.client.send_file(
- message.peer_id,
- file,
- caption=(
- "<b>📤 Command output seems to be too long, so it's sent in"
- " file.</b>"
- ),
- )
- if message.out:
- await message.delete()
- return result
- result = await (message.edit if edit else message.respond)(
- text,
- parse_mode=lambda t: (t, entities),
- **kwargs,
- )
- elif isinstance(response, Message):
- if message.media is None and (
- response.media is None or isinstance(response.media, MessageMediaWebPage)
- ):
- result = await message.edit(
- response.message,
- parse_mode=lambda t: (t, response.entities or []),
- link_preview=isinstance(response.media, MessageMediaWebPage),
- )
- else:
- result = await message.respond(response, **kwargs)
- else:
- if isinstance(response, bytes):
- response = io.BytesIO(response)
- elif isinstance(response, str):
- response = io.BytesIO(response.encode("utf-8"))
- if name := kwargs.pop("filename", None):
- response.name = name
- if message.media is not None and edit:
- await message.edit(file=response, **kwargs)
- else:
- kwargs.setdefault(
- "reply_to",
- getattr(message, "reply_to_msg_id", None),
- )
- result = await message.client.send_file(message.peer_id, response, **kwargs)
- if message.out:
- await message.delete()
- return result
- async def get_target(message: Message, arg_no: Optional[int] = 0) -> Union[int, None]:
- if any(
- isinstance(entity, MessageEntityMentionName)
- for entity in (message.entities or [])
- ):
- e = sorted(
- filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
- key=lambda x: x.offset,
- )[0]
- return e.user_id
- if len(get_args(message)) > arg_no:
- user = get_args(message)[arg_no]
- elif message.is_reply:
- return (await message.get_reply_message()).sender_id
- elif hasattr(message.peer_id, "user_id"):
- user = message.peer_id.user_id
- else:
- return None
- try:
- entity = await message.client.get_entity(user)
- except ValueError:
- return None
- else:
- if isinstance(entity, User):
- return entity.id
- def merge(a: dict, b: dict) -> dict:
- """Merge with replace dictionary a to dictionary b"""
- for key in a:
- if key in b:
- if isinstance(a[key], dict) and isinstance(b[key], dict):
- b[key] = merge(a[key], b[key])
- elif isinstance(a[key], list) and isinstance(b[key], list):
- b[key] = list(set(b[key] + a[key]))
- else:
- b[key] = a[key]
- b[key] = a[key]
- return b
- async def set_avatar(
- client: "TelegramClient", # type: ignore
- peer: Entity,
- avatar: str,
- ) -> bool:
- """Sets an entity avatar"""
- if isinstance(avatar, str) and check_url(avatar):
- f = (
- await run_sync(
- requests.get,
- avatar,
- )
- ).content
- elif isinstance(avatar, bytes):
- f = avatar
- else:
- return False
- res = await client(
- EditPhotoRequest(
- channel=peer,
- photo=await client.upload_file(f, file_name="photo.png"),
- )
- )
- try:
- await client.delete_messages(
- peer,
- message_ids=[
- next(
- update
- for update in res.updates
- if isinstance(update, UpdateNewChannelMessage)
- ).message.id
- ],
- )
- except Exception:
- pass
- return True
- async def asset_channel(
- client: "TelegramClient", # type: ignore
- title: str,
- description: str,
- *,
- channel: Optional[bool] = False,
- silent: Optional[bool] = False,
- archive: Optional[bool] = False,
- avatar: Optional[str] = "",
- _folder: Optional[str] = "",
- ) -> Tuple[Channel, bool]:
- """
- Create new channel (if needed) and return its entity
- :param client: Telegram client to create channel by
- :param title: Channel title
- :param description: Description
- :param channel: Whether to create a channel or supergroup
- :param silent: Automatically mute channel
- :param archive: Automatically archive channel
- :param avatar: Url to an avatar to set as pfp of created peer
- :param _folder: Do not use it, or things will go wrong
- :returns: Peer and bool: is channel new or pre-existent
- """
- if not hasattr(client, "_channels_cache"):
- client._channels_cache = {}
- if (
- title in client._channels_cache
- and client._channels_cache[title]["exp"] > time.time()
- ):
- return client._channels_cache[title]["peer"], False
- async for d in client.iter_dialogs():
- if d.title == title:
- client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
- return d.entity, False
- peer = (
- await client(
- CreateChannelRequest(
- title,
- description,
- megagroup=not channel,
- )
- )
- ).chats[0]
- if silent:
- await dnd(client, peer, archive)
- elif archive:
- await client.edit_folder(peer, 1)
- if avatar:
- await set_avatar(client, peer, avatar)
- if _folder:
- if _folder != "hikka":
- raise NotImplementedError
- folders = await client(GetDialogFiltersRequest())
- try:
- folder = next(folder for folder in folders if folder.title == "hikka")
- except Exception:
- folder = None
- if folder is not None and not any(
- peer.id == getattr(folder_peer, "channel_id", None)
- for folder_peer in folder.include_peers
- ):
- folder.include_peers += [await client.get_input_entity(peer)]
- await client(
- UpdateDialogFilterRequest(
- folder.id,
- folder,
- )
- )
- client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
- return peer, True
- async def dnd(
- client: "TelegramClient", # type: ignore
- peer: Entity,
- archive: Optional[bool] = True,
- ) -> bool:
- """
- Mutes and optionally archives peer
- :param peer: Anything entity-link
- :param archive: Archive peer, or just mute?
- :returns: `True` on success, otherwise `False`
- """
- try:
- await client(
- UpdateNotifySettingsRequest(
- peer=peer,
- settings=InputPeerNotifySettings(
- show_previews=False,
- silent=True,
- mute_until=2**31 - 1,
- ),
- )
- )
- if archive:
- await client.edit_folder(peer, 1)
- except Exception:
- logging.exception("utils.dnd error")
- return False
- return True
- def get_link(user: Union[User, Channel], /) -> str:
- """Get telegram permalink to entity"""
- return (
- f"tg://user?id={user.id}"
- if isinstance(user, User)
- else (
- f"tg://resolve?domain={user.username}"
- if getattr(user, "username", None)
- else ""
- )
- )
- def chunks(_list: Union[list, tuple, set], n: int, /) -> list:
- """Split provided `_list` into chunks of `n`"""
- return [_list[i : i + n] for i in range(0, len(_list), n)]
- def get_named_platform() -> str:
- """Returns formatted platform name"""
- try:
- if os.path.isfile("/proc/device-tree/model"):
- with open("/proc/device-tree/model") as f:
- model = f.read()
- if "Orange" in model:
- return f"🍊 {model}"
- return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
- except Exception:
- # In case of weird fs, aka Termux
- pass
- try:
- from platform import uname
- if "microsoft-standard" in uname().release:
- return "🍁 WSL"
- except Exception:
- pass
- is_termux = "com.termux" in os.environ.get("PREFIX", "")
- is_okteto = "OKTETO" in os.environ
- is_docker = "DOCKER" in os.environ
- is_heroku = "DYNO" in os.environ
- is_codespaces = "CODESPACES" in os.environ
- if is_heroku:
- return "♓️ Heroku"
- if is_docker:
- return "🐳 Docker"
- if is_termux:
- return "🕶 Termux"
- if is_okteto:
- return "☁️ Okteto"
- if is_codespaces:
- return "🐈⬛ Codespaces"
- is_lavhost = "LAVHOST" in os.environ
- return f"✌️ lavHost {os.environ['LAVHOST']}" if is_lavhost else "📻 VDS"
- def get_platform_emoji() -> str:
- BASE = (
- '<emoji document_id="{}">🌘</emoji><emoji'
- ' document_id="5195311729663286630">🌘</emoji><emoji'
- ' document_id="5195045669324201904">🌘</emoji>'
- )
- if "OKTETO" in os.environ:
- return BASE.format(5192767786174128165)
- if "CODESPACES" in os.environ:
- return BASE.format(5194976881127989720)
- if "DYNO" in os.environ:
- return BASE.format(5192845434887873156)
- if "com.termux" in os.environ.get("PREFIX", ""):
- return BASE.format(5193051778001673828)
- return BASE.format(5192765204898783881)
- def uptime() -> int:
- """Returns userbot uptime in seconds"""
- return round(time.perf_counter() - init_ts)
- def formatted_uptime() -> str:
- """Returnes formmated uptime"""
- return f"{str(timedelta(seconds=uptime()))}"
- def ascii_face() -> str:
- """Returnes cute ASCII-art face"""
- return escape_html(
- random.choice(
- [
- "ヽ(๑◠ܫ◠๑)ノ",
- "(◕ᴥ◕ʋ)",
- "ᕙ(`▽´)ᕗ",
- "(✿◠‿◠)",
- "(▰˘◡˘▰)",
- "(˵ ͡° ͜ʖ ͡°˵)",
- "ʕっ•ᴥ•ʔっ",
- "( ͡° ᴥ ͡°)",
- "(๑•́ ヮ •̀๑)",
- "٩(^‿^)۶",
- "(っˆڡˆς)",
- "ψ(`∇´)ψ",
- "⊙ω⊙",
- "٩(^ᴗ^)۶",
- "(´・ω・)っ由",
- "( ͡~ ͜ʖ ͡°)",
- "✧♡(◕‿◕✿)",
- "โ๏௰๏ใ ื",
- "∩。• ᵕ •。∩ ♡",
- "(♡´౪`♡)",
- "(◍>◡<◍)⋈。✧♡",
- "╰(✿´⌣`✿)╯♡",
- "ʕ•ᴥ•ʔ",
- "ᶘ ◕ᴥ◕ᶅ",
- "▼・ᴥ・▼",
- "ฅ^•ﻌ•^ฅ",
- "(΄◞ิ౪◟ิ‵)",
- "٩(^ᴗ^)۶",
- "ᕴーᴥーᕵ",
- "ʕ→ᴥ←ʔ",
- "ʕᵕᴥᵕʔ",
- "ʕᵒᴥᵒʔ",
- "ᵔᴥᵔ",
- "(✿╹◡╹)",
- "(๑→ܫ←)",
- "ʕ·ᴥ· ʔ",
- "(ノ≧ڡ≦)",
- "(≖ᴗ≖✿)",
- "(〜^∇^ )〜",
- "( ノ・ェ・ )ノ",
- "~( ˘▾˘~)",
- "(〜^∇^)〜",
- "ヽ(^ᴗ^ヽ)",
- "(´・ω・`)",
- "₍ᐢ•ﻌ•ᐢ₎*・゚。",
- "(。・・)_且",
- "(=`ω´=)",
- "(*•‿•*)",
- "(*゚∀゚*)",
- "(☉⋆‿⋆☉)",
- "ɷ◡ɷ",
- "ʘ‿ʘ",
- "(。-ω-)ノ",
- "( ・ω・)ノ",
- "(=゚ω゚)ノ",
- "(・ε・`*) …",
- "ʕっ•ᴥ•ʔっ",
- "(*˘︶˘*)",
- ]
- )
- )
- def array_sum(array: List[Any], /) -> List[Any]:
- """Performs basic sum operation on array"""
- result = []
- for item in array:
- result += item
- return result
- def rand(size: int, /) -> str:
- """Return random string of len `size`"""
- return "".join(
- [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
- )
- def smart_split(
- text: str,
- entities: List[FormattingEntity],
- length: Optional[int] = 4096,
- split_on: Optional[ListLike] = ("\n", " "),
- min_length: Optional[int] = 1,
- ):
- """
- Split the message into smaller messages.
- A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
- The end of each message except the last one is stripped of characters from [split_on]
- :param text: the plain text input
- :param entities: the entities
- :param length: the maximum length of a single message
- :param split_on: characters (or strings) which are preferred for a message break
- :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
- :return:
- """
- # Authored by @bsolute
- # https://t.me/LonamiWebs/27777
- encoded = text.encode("utf-16le")
- pending_entities = entities
- text_offset = 0
- bytes_offset = 0
- text_length = len(text)
- bytes_length = len(encoded)
- while text_offset < text_length:
- if bytes_offset + length * 2 >= bytes_length:
- yield parser.unparse(
- text[text_offset:],
- list(sorted(pending_entities, key=lambda x: x.offset)),
- )
- break
- codepoint_count = len(
- encoded[bytes_offset : bytes_offset + length * 2].decode(
- "utf-16le",
- errors="ignore",
- )
- )
- for search in split_on:
- search_index = text.rfind(
- search,
- text_offset + min_length,
- text_offset + codepoint_count,
- )
- if search_index != -1:
- break
- else:
- search_index = text_offset + codepoint_count
- split_index = grapheme.safe_split_index(text, search_index)
- split_offset_utf16 = (
- len(text[text_offset:split_index].encode("utf-16le"))
- ) // 2
- exclude = 0
- while (
- split_index + exclude < text_length
- and text[split_index + exclude] in split_on
- ):
- exclude += 1
- current_entities = []
- entities = pending_entities.copy()
- pending_entities = []
- for entity in entities:
- if (
- entity.offset < split_offset_utf16
- and entity.offset + entity.length > split_offset_utf16 + exclude
- ):
- # spans boundary
- current_entities.append(
- _copy_tl(
- entity,
- length=split_offset_utf16 - entity.offset,
- )
- )
- pending_entities.append(
- _copy_tl(
- entity,
- offset=0,
- length=entity.offset
- + entity.length
- - split_offset_utf16
- - exclude,
- )
- )
- elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
- # overlaps boundary
- current_entities.append(
- _copy_tl(
- entity,
- length=split_offset_utf16 - entity.offset,
- )
- )
- elif entity.offset < split_offset_utf16:
- # wholly left
- current_entities.append(entity)
- elif (
- entity.offset + entity.length
- > split_offset_utf16 + exclude
- > entity.offset
- ):
- # overlaps right boundary
- pending_entities.append(
- _copy_tl(
- entity,
- offset=0,
- length=entity.offset
- + entity.length
- - split_offset_utf16
- - exclude,
- )
- )
- elif entity.offset + entity.length > split_offset_utf16 + exclude:
- # wholly right
- pending_entities.append(
- _copy_tl(
- entity,
- offset=entity.offset - split_offset_utf16 - exclude,
- )
- )
- current_text = text[text_offset:split_index]
- yield parser.unparse(
- current_text,
- list(sorted(current_entities, key=lambda x: x.offset)),
- )
- text_offset = split_index + exclude
- bytes_offset += len(current_text.encode("utf-16le"))
- def _copy_tl(o, **kwargs):
- d = o.to_dict()
- del d["_"]
- d.update(kwargs)
- return o.__class__(**d)
- def check_url(url: str) -> bool:
- """Checks url for validity"""
- try:
- return bool(urlparse(url).netloc)
- except Exception:
- return False
- def get_git_hash() -> Union[str, bool]:
- """Get current Hikka git hash"""
- try:
- repo = git.Repo()
- return repo.heads[0].commit.hexsha
- except Exception:
- return False
- def get_commit_url() -> str:
- """Get current Hikka git commit url"""
- try:
- repo = git.Repo()
- hash_ = repo.heads[0].commit.hexsha
- return (
- f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
- )
- except Exception:
- return "Unknown"
- def is_serializable(x: Any, /) -> bool:
- """Checks if object is JSON-serializable"""
- try:
- json.dumps(x)
- return True
- except Exception:
- return False
- def get_lang_flag(countrycode: str) -> str:
- """
- Gets an emoji of specified countrycode
- :param countrycode: 2-letter countrycode
- :returns: Emoji flag
- """
- if (
- len(
- code := [
- c
- for c in countrycode.lower()
- if c in string.ascii_letters + string.digits
- ]
- )
- == 2
- ):
- return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
- return countrycode
- def get_entity_url(
- entity: Union[User, Channel],
- openmessage: Optional[bool] = False,
- ) -> str:
- """
- Get link to object, if available
- :param entity: Entity to get url of
- :param openmessage: Use tg://openmessage link for users
- :return: Link to object or empty string
- """
- return (
- (
- f"tg://openmessage?id={entity.id}"
- if openmessage
- else f"tg://user?id={entity.id}"
- )
- if isinstance(entity, User)
- else (
- f"tg://resolve?domain={entity.username}"
- if getattr(entity, "username", None)
- else ""
- )
- )
- async def get_message_link(
- message: Message,
- chat: Optional[Union[Chat, Channel]] = None,
- ) -> str:
- if message.is_private:
- return (
- f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
- )
- if not chat:
- chat = await message.get_chat()
- return (
- f"https://t.me/{chat.username}/{message.id}"
- if getattr(chat, "username", False)
- else f"https://t.me/c/{chat.id}/{message.id}"
- )
- def remove_html(text: str, escape: Optional[bool] = False) -> str:
- """
- Removes HTML tags from text
- :param text: Text to remove HTML from
- :param escape: Escape HTML
- :return: Text without HTML
- """
- return (escape_html if escape else str)(
- re.sub(
- r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)",
- "",
- text,
- )
- )
- def get_kwargs() -> dict:
- """
- Get kwargs of function, in which is called
- :return: kwargs
- """
- # https://stackoverflow.com/a/65927265/19170642
- frame = inspect.currentframe().f_back
- keys, _, _, values = inspect.getargvalues(frame)
- return {key: values[key] for key in keys if key != "self"}
- def mime_type(message: Message) -> str:
- """
- Get mime type of document in message
- :param message: Message with document
- :return: Mime type or empty string if not present
- """
- return (
- ""
- if not isinstance(message, Message) or not getattr(message, "media", False)
- else getattr(getattr(message, "media", False), "mime_type", False) or ""
- )
- def find_caller(stack: Optional[List[inspect.FrameInfo]] = None) -> Any:
- """Attempts to find command in stack"""
- caller = next(
- (
- frame_info
- for frame_info in stack or inspect.stack()
- if hasattr(frame_info, "function")
- and any(
- inspect.isclass(cls_)
- and issubclass(cls_, Module)
- and cls_ is not Module
- for cls_ in frame_info.frame.f_globals.values()
- )
- ),
- None,
- )
- if not caller:
- return next(
- (
- frame_info.frame.f_locals["func"]
- for frame_info in stack or inspect.stack()
- if hasattr(frame_info, "function")
- and frame_info.function == "future_dispatcher"
- and (
- "CommandDispatcher"
- in getattr(getattr(frame_info, "frame", None), "f_globals", {})
- )
- ),
- None,
- )
- return next(
- (
- getattr(cls_, caller.function, None)
- for cls_ in caller.frame.f_globals.values()
- if inspect.isclass(cls_) and issubclass(cls_, Module)
- ),
- None,
- )
- def validate_html(html: str) -> str:
- """Removes broken tags from html"""
- text, entities = telethon.extensions.html.parse(html)
- return telethon.extensions.html.unparse(escape_html(text), entities)
- init_ts = time.perf_counter()
- # GeekTG Compatibility
- def get_git_info():
- # https://github.com/GeekTG/Friendly-Telegram/blob/master/friendly-telegram/utils.py#L133
- try:
- repo = git.Repo()
- ver = repo.heads[0].commit.hexsha
- except Exception:
- ver = ""
- return [
- ver,
- f"https://github.com/hikariatama/Hikka/commit/{ver}" if ver else "",
- ]
- def get_version_raw():
- """Get the version of the userbot"""
- # https://github.com/GeekTG/Friendly-Telegram/blob/master/friendly-telegram/utils.py#L128
- from . import version
- return ".".join(list(map(str, list(version.__version__))))
- get_platform_name = get_named_platform
|