utils.py 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162
  1. """Utilities"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import asyncio
  22. import functools
  23. import io
  24. import json
  25. import logging
  26. import os
  27. import random
  28. import re
  29. import shlex
  30. import string
  31. import time
  32. import inspect
  33. from datetime import timedelta
  34. from typing import Any, List, Optional, Tuple, Union
  35. from urllib.parse import urlparse
  36. import git
  37. import grapheme
  38. import requests
  39. import telethon
  40. from telethon.hints import Entity
  41. from telethon.tl.custom.message import Message
  42. from telethon.tl.functions.account import UpdateNotifySettingsRequest
  43. from telethon.tl.functions.channels import CreateChannelRequest, EditPhotoRequest
  44. from telethon.tl.functions.messages import (
  45. GetDialogFiltersRequest,
  46. UpdateDialogFilterRequest,
  47. )
  48. from telethon.tl.types import (
  49. Channel,
  50. InputPeerNotifySettings,
  51. MessageEntityBankCard,
  52. MessageEntityBlockquote,
  53. MessageEntityBold,
  54. MessageEntityBotCommand,
  55. MessageEntityCashtag,
  56. MessageEntityCode,
  57. MessageEntityEmail,
  58. MessageEntityHashtag,
  59. MessageEntityItalic,
  60. MessageEntityMention,
  61. MessageEntityMentionName,
  62. MessageEntityPhone,
  63. MessageEntityPre,
  64. MessageEntitySpoiler,
  65. MessageEntityStrike,
  66. MessageEntityTextUrl,
  67. MessageEntityUnderline,
  68. MessageEntityUnknown,
  69. MessageEntityUrl,
  70. MessageMediaWebPage,
  71. PeerChannel,
  72. PeerChat,
  73. PeerUser,
  74. User,
  75. Chat,
  76. UpdateNewChannelMessage,
  77. )
  78. from aiogram.types import Message as AiogramMessage
  79. from .inline.types import InlineCall, InlineMessage
  80. from .types import Module
  81. FormattingEntity = Union[
  82. MessageEntityUnknown,
  83. MessageEntityMention,
  84. MessageEntityHashtag,
  85. MessageEntityBotCommand,
  86. MessageEntityUrl,
  87. MessageEntityEmail,
  88. MessageEntityBold,
  89. MessageEntityItalic,
  90. MessageEntityCode,
  91. MessageEntityPre,
  92. MessageEntityTextUrl,
  93. MessageEntityMentionName,
  94. MessageEntityPhone,
  95. MessageEntityCashtag,
  96. MessageEntityUnderline,
  97. MessageEntityStrike,
  98. MessageEntityBlockquote,
  99. MessageEntityBankCard,
  100. MessageEntitySpoiler,
  101. ]
  102. ListLike = Union[list, set, tuple]
  103. emoji_pattern = re.compile(
  104. "["
  105. "\U0001F600-\U0001F64F" # emoticons
  106. "\U0001F300-\U0001F5FF" # symbols & pictographs
  107. "\U0001F680-\U0001F6FF" # transport & map symbols
  108. "\U0001F1E0-\U0001F1FF" # flags (iOS)
  109. "]+",
  110. flags=re.UNICODE,
  111. )
  112. parser = telethon.utils.sanitize_parse_mode("html")
  113. def get_args(message: Message) -> List[str]:
  114. """Get arguments from message (str or Message), return list of arguments"""
  115. if not (message := getattr(message, "message", message)):
  116. return False
  117. if len(message := message.split(maxsplit=1)) <= 1:
  118. return []
  119. message = message[1]
  120. try:
  121. split = shlex.split(message)
  122. except ValueError:
  123. return message # Cannot split, let's assume that it's just one long message
  124. return list(filter(lambda x: len(x) > 0, split))
  125. def get_args_raw(message: Message) -> str:
  126. """Get the parameters to the command as a raw string (not split)"""
  127. if not (message := getattr(message, "message", message)):
  128. return False
  129. return args[1] if len(args := message.split(maxsplit=1)) > 1 else ""
  130. def get_args_split_by(message: Message, separator: str) -> List[str]:
  131. """Split args with a specific separator"""
  132. return [
  133. section.strip() for section in get_args_raw(message).split(separator) if section
  134. ]
  135. def get_chat_id(message: Union[Message, AiogramMessage]) -> int:
  136. """Get the chat ID, but without -100 if its a channel"""
  137. return telethon.utils.resolve_id(
  138. getattr(message, "chat_id", None)
  139. or getattr(getattr(message, "chat", None), "id", None)
  140. )[0]
  141. def get_entity_id(entity: Entity) -> int:
  142. """Get entity ID"""
  143. return telethon.utils.get_peer_id(entity)
  144. def escape_html(text: str, /) -> str: # sourcery skip
  145. """Pass all untrusted/potentially corrupt input here"""
  146. return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")
  147. def escape_quotes(text: str, /) -> str:
  148. """Escape quotes to html quotes"""
  149. return escape_html(text).replace('"', "&quot;")
  150. def get_base_dir() -> str:
  151. """Get directory of this file"""
  152. from . import __main__
  153. return get_dir(__main__.__file__)
  154. def get_dir(mod: str) -> str:
  155. """Get directory of given module"""
  156. return os.path.abspath(os.path.dirname(os.path.abspath(mod)))
  157. async def get_user(message: Message) -> Union[None, User]:
  158. """Get user who sent message, searching if not found easily"""
  159. try:
  160. return await message.client.get_entity(message.sender_id)
  161. except ValueError: # Not in database. Lets go looking for them.
  162. logging.debug("User not in session cache. Searching...")
  163. if isinstance(message.peer_id, PeerUser):
  164. await message.client.get_dialogs()
  165. return await message.client.get_entity(message.sender_id)
  166. if isinstance(message.peer_id, (PeerChannel, PeerChat)):
  167. try:
  168. return await message.client.get_entity(message.sender_id)
  169. except Exception:
  170. pass
  171. async for user in message.client.iter_participants(
  172. message.peer_id,
  173. aggressive=True,
  174. ):
  175. if user.id == message.sender_id:
  176. return user
  177. logging.error("User isn't in the group where they sent the message")
  178. return None
  179. logging.error("`peer_id` is not a user, chat or channel")
  180. return None
  181. def run_sync(func, *args, **kwargs):
  182. """
  183. Run a non-async function in a new thread and return an awaitable
  184. :param func: Sync-only function to execute
  185. :returns: Awaitable coroutine
  186. """
  187. return asyncio.get_event_loop().run_in_executor(
  188. None,
  189. functools.partial(func, *args, **kwargs),
  190. )
  191. def run_async(loop, coro):
  192. """Run an async function as a non-async function, blocking till it's done"""
  193. # When we bump minimum support to 3.7, use run()
  194. return asyncio.run_coroutine_threadsafe(coro, loop).result()
  195. def censor(
  196. obj,
  197. to_censor: Optional[List[str]] = None,
  198. replace_with: Optional[str] = "redacted_{count}_chars",
  199. ):
  200. """May modify the original object, but don't rely on it"""
  201. if to_censor is None:
  202. to_censor = ["phone"]
  203. for k, v in vars(obj).items():
  204. if k in to_censor:
  205. setattr(obj, k, replace_with.format(count=len(v)))
  206. elif k[0] != "_" and hasattr(v, "__dict__"):
  207. setattr(obj, k, censor(v, to_censor, replace_with))
  208. return obj
  209. def relocate_entities(
  210. entities: list,
  211. offset: int,
  212. text: Optional[str] = None,
  213. ) -> list:
  214. """Move all entities by offset (truncating at text)"""
  215. length = len(text) if text is not None else 0
  216. for ent in entities.copy() if entities else ():
  217. ent.offset += offset
  218. if ent.offset < 0:
  219. ent.length += ent.offset
  220. ent.offset = 0
  221. if text is not None and ent.offset + ent.length > length:
  222. ent.length = length - ent.offset
  223. if ent.length <= 0:
  224. entities.remove(ent)
  225. return entities
  226. async def answer(
  227. message: Union[Message, InlineCall, InlineMessage],
  228. response: str,
  229. *,
  230. reply_markup: Optional[Union[List[List[dict]], List[dict], dict]] = None,
  231. **kwargs,
  232. ) -> Union[InlineCall, InlineMessage, Message]:
  233. """Use this to give the response to a command"""
  234. # Compatibility with FTG\GeekTG
  235. if isinstance(message, list) and message:
  236. message = message[0]
  237. if reply_markup is not None:
  238. if not isinstance(reply_markup, (list, dict)):
  239. raise ValueError("reply_markup must be a list or dict")
  240. if reply_markup:
  241. if isinstance(message, (InlineMessage, InlineCall)):
  242. await message.edit(response, reply_markup)
  243. return
  244. reply_markup = message.client.loader.inline._normalize_markup(reply_markup)
  245. result = await message.client.loader.inline.form(
  246. response,
  247. message=message if message.out else get_chat_id(message),
  248. reply_markup=reply_markup,
  249. **kwargs,
  250. )
  251. return result
  252. if isinstance(message, (InlineMessage, InlineCall)):
  253. await message.edit(response)
  254. return message
  255. kwargs.setdefault("link_preview", False)
  256. if not (edit := (message.out and not message.via_bot_id and not message.fwd_from)):
  257. kwargs.setdefault(
  258. "reply_to",
  259. getattr(message, "reply_to_msg_id", None),
  260. )
  261. parse_mode = telethon.utils.sanitize_parse_mode(
  262. kwargs.pop(
  263. "parse_mode",
  264. message.client.parse_mode,
  265. )
  266. )
  267. if isinstance(response, str) and not kwargs.pop("asfile", False):
  268. text, entities = parse_mode.parse(response)
  269. if len(text) >= 4096 and not hasattr(message, "hikka_grepped"):
  270. try:
  271. if not message.client.loader.inline.init_complete:
  272. raise
  273. strings = list(smart_split(text, entities, 4096))
  274. if len(strings) > 10:
  275. raise
  276. list_ = await message.client.loader.inline.list(
  277. message=message,
  278. strings=strings,
  279. )
  280. if not list_:
  281. raise
  282. return list_
  283. except Exception:
  284. file = io.BytesIO(text.encode("utf-8"))
  285. file.name = "command_result.txt"
  286. result = await message.client.send_file(
  287. message.peer_id,
  288. file,
  289. caption=(
  290. "<b>📤 Command output seems to be too long, so it's sent in"
  291. " file.</b>"
  292. ),
  293. )
  294. if message.out:
  295. await message.delete()
  296. return result
  297. result = await (message.edit if edit else message.respond)(
  298. text,
  299. parse_mode=lambda t: (t, entities),
  300. **kwargs,
  301. )
  302. elif isinstance(response, Message):
  303. if message.media is None and (
  304. response.media is None or isinstance(response.media, MessageMediaWebPage)
  305. ):
  306. result = await message.edit(
  307. response.message,
  308. parse_mode=lambda t: (t, response.entities or []),
  309. link_preview=isinstance(response.media, MessageMediaWebPage),
  310. )
  311. else:
  312. result = await message.respond(response, **kwargs)
  313. else:
  314. if isinstance(response, bytes):
  315. response = io.BytesIO(response)
  316. elif isinstance(response, str):
  317. response = io.BytesIO(response.encode("utf-8"))
  318. if name := kwargs.pop("filename", None):
  319. response.name = name
  320. if message.media is not None and edit:
  321. await message.edit(file=response, **kwargs)
  322. else:
  323. kwargs.setdefault(
  324. "reply_to",
  325. getattr(message, "reply_to_msg_id", None),
  326. )
  327. result = await message.client.send_file(message.peer_id, response, **kwargs)
  328. if message.out:
  329. await message.delete()
  330. return result
  331. async def get_target(message: Message, arg_no: Optional[int] = 0) -> Union[int, None]:
  332. if any(
  333. isinstance(entity, MessageEntityMentionName)
  334. for entity in (message.entities or [])
  335. ):
  336. e = sorted(
  337. filter(lambda x: isinstance(x, MessageEntityMentionName), message.entities),
  338. key=lambda x: x.offset,
  339. )[0]
  340. return e.user_id
  341. if len(get_args(message)) > arg_no:
  342. user = get_args(message)[arg_no]
  343. elif message.is_reply:
  344. return (await message.get_reply_message()).sender_id
  345. elif hasattr(message.peer_id, "user_id"):
  346. user = message.peer_id.user_id
  347. else:
  348. return None
  349. try:
  350. entity = await message.client.get_entity(user)
  351. except ValueError:
  352. return None
  353. else:
  354. if isinstance(entity, User):
  355. return entity.id
  356. def merge(a: dict, b: dict) -> dict:
  357. """Merge with replace dictionary a to dictionary b"""
  358. for key in a:
  359. if key in b:
  360. if isinstance(a[key], dict) and isinstance(b[key], dict):
  361. b[key] = merge(a[key], b[key])
  362. elif isinstance(a[key], list) and isinstance(b[key], list):
  363. b[key] = list(set(b[key] + a[key]))
  364. else:
  365. b[key] = a[key]
  366. b[key] = a[key]
  367. return b
  368. async def set_avatar(
  369. client: "TelegramClient", # type: ignore
  370. peer: Entity,
  371. avatar: str,
  372. ) -> bool:
  373. """Sets an entity avatar"""
  374. if isinstance(avatar, str) and check_url(avatar):
  375. f = (
  376. await run_sync(
  377. requests.get,
  378. avatar,
  379. )
  380. ).content
  381. elif isinstance(avatar, bytes):
  382. f = avatar
  383. else:
  384. return False
  385. res = await client(
  386. EditPhotoRequest(
  387. channel=peer,
  388. photo=await client.upload_file(f, file_name="photo.png"),
  389. )
  390. )
  391. try:
  392. await client.delete_messages(
  393. peer,
  394. message_ids=[
  395. next(
  396. update
  397. for update in res.updates
  398. if isinstance(update, UpdateNewChannelMessage)
  399. ).message.id
  400. ],
  401. )
  402. except Exception:
  403. pass
  404. return True
  405. async def asset_channel(
  406. client: "TelegramClient", # type: ignore
  407. title: str,
  408. description: str,
  409. *,
  410. channel: Optional[bool] = False,
  411. silent: Optional[bool] = False,
  412. archive: Optional[bool] = False,
  413. avatar: Optional[str] = "",
  414. _folder: Optional[str] = "",
  415. ) -> Tuple[Channel, bool]:
  416. """
  417. Create new channel (if needed) and return its entity
  418. :param client: Telegram client to create channel by
  419. :param title: Channel title
  420. :param description: Description
  421. :param channel: Whether to create a channel or supergroup
  422. :param silent: Automatically mute channel
  423. :param archive: Automatically archive channel
  424. :param avatar: Url to an avatar to set as pfp of created peer
  425. :param _folder: Do not use it, or things will go wrong
  426. :returns: Peer and bool: is channel new or pre-existent
  427. """
  428. if not hasattr(client, "_channels_cache"):
  429. client._channels_cache = {}
  430. if (
  431. title in client._channels_cache
  432. and client._channels_cache[title]["exp"] > time.time()
  433. ):
  434. return client._channels_cache[title]["peer"], False
  435. async for d in client.iter_dialogs():
  436. if d.title == title:
  437. client._channels_cache[title] = {"peer": d.entity, "exp": int(time.time())}
  438. return d.entity, False
  439. peer = (
  440. await client(
  441. CreateChannelRequest(
  442. title,
  443. description,
  444. megagroup=not channel,
  445. )
  446. )
  447. ).chats[0]
  448. if silent:
  449. await dnd(client, peer, archive)
  450. elif archive:
  451. await client.edit_folder(peer, 1)
  452. if avatar:
  453. await set_avatar(client, peer, avatar)
  454. if _folder:
  455. if _folder != "hikka":
  456. raise NotImplementedError
  457. folders = await client(GetDialogFiltersRequest())
  458. try:
  459. folder = next(folder for folder in folders if folder.title == "hikka")
  460. except Exception:
  461. folder = None
  462. if folder is not None and not any(
  463. peer.id == getattr(folder_peer, "channel_id", None)
  464. for folder_peer in folder.include_peers
  465. ):
  466. folder.include_peers += [await client.get_input_entity(peer)]
  467. await client(
  468. UpdateDialogFilterRequest(
  469. folder.id,
  470. folder,
  471. )
  472. )
  473. client._channels_cache[title] = {"peer": peer, "exp": int(time.time())}
  474. return peer, True
  475. async def dnd(
  476. client: "TelegramClient", # type: ignore
  477. peer: Entity,
  478. archive: Optional[bool] = True,
  479. ) -> bool:
  480. """
  481. Mutes and optionally archives peer
  482. :param peer: Anything entity-link
  483. :param archive: Archive peer, or just mute?
  484. :returns: `True` on success, otherwise `False`
  485. """
  486. try:
  487. await client(
  488. UpdateNotifySettingsRequest(
  489. peer=peer,
  490. settings=InputPeerNotifySettings(
  491. show_previews=False,
  492. silent=True,
  493. mute_until=2**31 - 1,
  494. ),
  495. )
  496. )
  497. if archive:
  498. await client.edit_folder(peer, 1)
  499. except Exception:
  500. logging.exception("utils.dnd error")
  501. return False
  502. return True
  503. def get_link(user: Union[User, Channel], /) -> str:
  504. """Get telegram permalink to entity"""
  505. return (
  506. f"tg://user?id={user.id}"
  507. if isinstance(user, User)
  508. else (
  509. f"tg://resolve?domain={user.username}"
  510. if getattr(user, "username", None)
  511. else ""
  512. )
  513. )
  514. def chunks(_list: Union[list, tuple, set], n: int, /) -> list:
  515. """Split provided `_list` into chunks of `n`"""
  516. return [_list[i : i + n] for i in range(0, len(_list), n)]
  517. def get_named_platform() -> str:
  518. """Returns formatted platform name"""
  519. try:
  520. if os.path.isfile("/proc/device-tree/model"):
  521. with open("/proc/device-tree/model") as f:
  522. model = f.read()
  523. if "Orange" in model:
  524. return f"🍊 {model}"
  525. return f"🍇 {model}" if "Raspberry" in model else f"❓ {model}"
  526. except Exception:
  527. # In case of weird fs, aka Termux
  528. pass
  529. try:
  530. from platform import uname
  531. if "microsoft-standard" in uname().release:
  532. return "🍁 WSL"
  533. except Exception:
  534. pass
  535. is_termux = "com.termux" in os.environ.get("PREFIX", "")
  536. is_okteto = "OKTETO" in os.environ
  537. is_docker = "DOCKER" in os.environ
  538. is_heroku = "DYNO" in os.environ
  539. is_codespaces = "CODESPACES" in os.environ
  540. if is_heroku:
  541. return "♓️ Heroku"
  542. if is_docker:
  543. return "🐳 Docker"
  544. if is_termux:
  545. return "🕶 Termux"
  546. if is_okteto:
  547. return "☁️ Okteto"
  548. if is_codespaces:
  549. return "🐈‍⬛ Codespaces"
  550. is_lavhost = "LAVHOST" in os.environ
  551. return f"✌️ lavHost {os.environ['LAVHOST']}" if is_lavhost else "📻 VDS"
  552. def get_platform_emoji() -> str:
  553. BASE = (
  554. '<emoji document_id="{}">🌘</emoji><emoji'
  555. ' document_id="5195311729663286630">🌘</emoji><emoji'
  556. ' document_id="5195045669324201904">🌘</emoji>'
  557. )
  558. if "OKTETO" in os.environ:
  559. return BASE.format(5192767786174128165)
  560. if "CODESPACES" in os.environ:
  561. return BASE.format(5194976881127989720)
  562. if "DYNO" in os.environ:
  563. return BASE.format(5192845434887873156)
  564. if "com.termux" in os.environ.get("PREFIX", ""):
  565. return BASE.format(5193051778001673828)
  566. return BASE.format(5192765204898783881)
  567. def uptime() -> int:
  568. """Returns userbot uptime in seconds"""
  569. return round(time.perf_counter() - init_ts)
  570. def formatted_uptime() -> str:
  571. """Returnes formmated uptime"""
  572. return f"{str(timedelta(seconds=uptime()))}"
  573. def ascii_face() -> str:
  574. """Returnes cute ASCII-art face"""
  575. return escape_html(
  576. random.choice(
  577. [
  578. "ヽ(๑◠ܫ◠๑)ノ",
  579. "(◕ᴥ◕ʋ)",
  580. "ᕙ(`▽´)ᕗ",
  581. "(✿◠‿◠)",
  582. "(▰˘◡˘▰)",
  583. "(˵ ͡° ͜ʖ ͡°˵)",
  584. "ʕっ•ᴥ•ʔっ",
  585. "( ͡° ᴥ ͡°)",
  586. "(๑•́ ヮ •̀๑)",
  587. "٩(^‿^)۶",
  588. "(っˆڡˆς)",
  589. "ψ(`∇´)ψ",
  590. "⊙ω⊙",
  591. "٩(^ᴗ^)۶",
  592. "(´・ω・)っ由",
  593. "( ͡~ ͜ʖ ͡°)",
  594. "✧♡(◕‿◕✿)",
  595. "โ๏௰๏ใ ื",
  596. "∩。• ᵕ •。∩ ♡",
  597. "(♡´౪`♡)",
  598. "(◍>◡<◍)⋈。✧♡",
  599. "╰(✿´⌣`✿)╯♡",
  600. "ʕ•ᴥ•ʔ",
  601. "ᶘ ◕ᴥ◕ᶅ",
  602. "▼・ᴥ・▼",
  603. "ฅ^•ﻌ•^ฅ",
  604. "(΄◞ิ౪◟ิ‵)",
  605. "٩(^ᴗ^)۶",
  606. "ᕴーᴥーᕵ",
  607. "ʕ→ᴥ←ʔ",
  608. "ʕᵕᴥᵕʔ",
  609. "ʕᵒᴥᵒʔ",
  610. "ᵔᴥᵔ",
  611. "(✿╹◡╹)",
  612. "(๑→ܫ←)",
  613. "ʕ·ᴥ· ʔ",
  614. "(ノ≧ڡ≦)",
  615. "(≖ᴗ≖✿)",
  616. "(〜^∇^ )〜",
  617. "( ノ・ェ・ )ノ",
  618. "~( ˘▾˘~)",
  619. "(〜^∇^)〜",
  620. "ヽ(^ᴗ^ヽ)",
  621. "(´・ω・`)",
  622. "₍ᐢ•ﻌ•ᐢ₎*・゚。",
  623. "(。・・)_且",
  624. "(=`ω´=)",
  625. "(*•‿•*)",
  626. "(*゚∀゚*)",
  627. "(☉⋆‿⋆☉)",
  628. "ɷ◡ɷ",
  629. "ʘ‿ʘ",
  630. "(。-ω-)ノ",
  631. "( ・ω・)ノ",
  632. "(=゚ω゚)ノ",
  633. "(・ε・`*) …",
  634. "ʕっ•ᴥ•ʔっ",
  635. "(*˘︶˘*)",
  636. ]
  637. )
  638. )
  639. def array_sum(array: List[Any], /) -> List[Any]:
  640. """Performs basic sum operation on array"""
  641. result = []
  642. for item in array:
  643. result += item
  644. return result
  645. def rand(size: int, /) -> str:
  646. """Return random string of len `size`"""
  647. return "".join(
  648. [random.choice("abcdefghijklmnopqrstuvwxyz1234567890") for _ in range(size)]
  649. )
  650. def smart_split(
  651. text: str,
  652. entities: List[FormattingEntity],
  653. length: Optional[int] = 4096,
  654. split_on: Optional[ListLike] = ("\n", " "),
  655. min_length: Optional[int] = 1,
  656. ):
  657. """
  658. Split the message into smaller messages.
  659. A grapheme will never be broken. Entities will be displaced to match the right location. No inputs will be mutated.
  660. The end of each message except the last one is stripped of characters from [split_on]
  661. :param text: the plain text input
  662. :param entities: the entities
  663. :param length: the maximum length of a single message
  664. :param split_on: characters (or strings) which are preferred for a message break
  665. :param min_length: ignore any matches on [split_on] strings before this number of characters into each message
  666. :return:
  667. """
  668. # Authored by @bsolute
  669. # https://t.me/LonamiWebs/27777
  670. encoded = text.encode("utf-16le")
  671. pending_entities = entities
  672. text_offset = 0
  673. bytes_offset = 0
  674. text_length = len(text)
  675. bytes_length = len(encoded)
  676. while text_offset < text_length:
  677. if bytes_offset + length * 2 >= bytes_length:
  678. yield parser.unparse(
  679. text[text_offset:],
  680. list(sorted(pending_entities, key=lambda x: x.offset)),
  681. )
  682. break
  683. codepoint_count = len(
  684. encoded[bytes_offset : bytes_offset + length * 2].decode(
  685. "utf-16le",
  686. errors="ignore",
  687. )
  688. )
  689. for search in split_on:
  690. search_index = text.rfind(
  691. search,
  692. text_offset + min_length,
  693. text_offset + codepoint_count,
  694. )
  695. if search_index != -1:
  696. break
  697. else:
  698. search_index = text_offset + codepoint_count
  699. split_index = grapheme.safe_split_index(text, search_index)
  700. split_offset_utf16 = (
  701. len(text[text_offset:split_index].encode("utf-16le"))
  702. ) // 2
  703. exclude = 0
  704. while (
  705. split_index + exclude < text_length
  706. and text[split_index + exclude] in split_on
  707. ):
  708. exclude += 1
  709. current_entities = []
  710. entities = pending_entities.copy()
  711. pending_entities = []
  712. for entity in entities:
  713. if (
  714. entity.offset < split_offset_utf16
  715. and entity.offset + entity.length > split_offset_utf16 + exclude
  716. ):
  717. # spans boundary
  718. current_entities.append(
  719. _copy_tl(
  720. entity,
  721. length=split_offset_utf16 - entity.offset,
  722. )
  723. )
  724. pending_entities.append(
  725. _copy_tl(
  726. entity,
  727. offset=0,
  728. length=entity.offset
  729. + entity.length
  730. - split_offset_utf16
  731. - exclude,
  732. )
  733. )
  734. elif entity.offset < split_offset_utf16 < entity.offset + entity.length:
  735. # overlaps boundary
  736. current_entities.append(
  737. _copy_tl(
  738. entity,
  739. length=split_offset_utf16 - entity.offset,
  740. )
  741. )
  742. elif entity.offset < split_offset_utf16:
  743. # wholly left
  744. current_entities.append(entity)
  745. elif (
  746. entity.offset + entity.length
  747. > split_offset_utf16 + exclude
  748. > entity.offset
  749. ):
  750. # overlaps right boundary
  751. pending_entities.append(
  752. _copy_tl(
  753. entity,
  754. offset=0,
  755. length=entity.offset
  756. + entity.length
  757. - split_offset_utf16
  758. - exclude,
  759. )
  760. )
  761. elif entity.offset + entity.length > split_offset_utf16 + exclude:
  762. # wholly right
  763. pending_entities.append(
  764. _copy_tl(
  765. entity,
  766. offset=entity.offset - split_offset_utf16 - exclude,
  767. )
  768. )
  769. current_text = text[text_offset:split_index]
  770. yield parser.unparse(
  771. current_text,
  772. list(sorted(current_entities, key=lambda x: x.offset)),
  773. )
  774. text_offset = split_index + exclude
  775. bytes_offset += len(current_text.encode("utf-16le"))
  776. def _copy_tl(o, **kwargs):
  777. d = o.to_dict()
  778. del d["_"]
  779. d.update(kwargs)
  780. return o.__class__(**d)
  781. def check_url(url: str) -> bool:
  782. """Checks url for validity"""
  783. try:
  784. return bool(urlparse(url).netloc)
  785. except Exception:
  786. return False
  787. def get_git_hash() -> Union[str, bool]:
  788. """Get current Hikka git hash"""
  789. try:
  790. repo = git.Repo()
  791. return repo.heads[0].commit.hexsha
  792. except Exception:
  793. return False
  794. def get_commit_url() -> str:
  795. """Get current Hikka git commit url"""
  796. try:
  797. repo = git.Repo()
  798. hash_ = repo.heads[0].commit.hexsha
  799. return (
  800. f'<a href="https://github.com/hikariatama/Hikka/commit/{hash_}">#{hash_[:7]}</a>'
  801. )
  802. except Exception:
  803. return "Unknown"
  804. def is_serializable(x: Any, /) -> bool:
  805. """Checks if object is JSON-serializable"""
  806. try:
  807. json.dumps(x)
  808. return True
  809. except Exception:
  810. return False
  811. def get_lang_flag(countrycode: str) -> str:
  812. """
  813. Gets an emoji of specified countrycode
  814. :param countrycode: 2-letter countrycode
  815. :returns: Emoji flag
  816. """
  817. if (
  818. len(
  819. code := [
  820. c
  821. for c in countrycode.lower()
  822. if c in string.ascii_letters + string.digits
  823. ]
  824. )
  825. == 2
  826. ):
  827. return "".join([chr(ord(c.upper()) + (ord("🇦") - ord("A"))) for c in code])
  828. return countrycode
  829. def get_entity_url(
  830. entity: Union[User, Channel],
  831. openmessage: Optional[bool] = False,
  832. ) -> str:
  833. """
  834. Get link to object, if available
  835. :param entity: Entity to get url of
  836. :param openmessage: Use tg://openmessage link for users
  837. :return: Link to object or empty string
  838. """
  839. return (
  840. (
  841. f"tg://openmessage?id={entity.id}"
  842. if openmessage
  843. else f"tg://user?id={entity.id}"
  844. )
  845. if isinstance(entity, User)
  846. else (
  847. f"tg://resolve?domain={entity.username}"
  848. if getattr(entity, "username", None)
  849. else ""
  850. )
  851. )
  852. async def get_message_link(
  853. message: Message,
  854. chat: Optional[Union[Chat, Channel]] = None,
  855. ) -> str:
  856. if message.is_private:
  857. return (
  858. f"tg://openmessage?user_id={get_chat_id(message)}&message_id={message.id}"
  859. )
  860. if not chat:
  861. chat = await message.get_chat()
  862. return (
  863. f"https://t.me/{chat.username}/{message.id}"
  864. if getattr(chat, "username", False)
  865. else f"https://t.me/c/{chat.id}/{message.id}"
  866. )
  867. def remove_html(text: str, escape: Optional[bool] = False) -> str:
  868. """
  869. Removes HTML tags from text
  870. :param text: Text to remove HTML from
  871. :param escape: Escape HTML
  872. :return: Text without HTML
  873. """
  874. return (escape_html if escape else str)(
  875. re.sub(
  876. r"(<\/?a.*?>|<\/?b>|<\/?i>|<\/?u>|<\/?strong>|<\/?em>|<\/?code>|<\/?strike>|<\/?del>|<\/?pre.*?>|<\/?emoji.*?>)",
  877. "",
  878. text,
  879. )
  880. )
  881. def get_kwargs() -> dict:
  882. """
  883. Get kwargs of function, in which is called
  884. :return: kwargs
  885. """
  886. # https://stackoverflow.com/a/65927265/19170642
  887. frame = inspect.currentframe().f_back
  888. keys, _, _, values = inspect.getargvalues(frame)
  889. return {key: values[key] for key in keys if key != "self"}
  890. def mime_type(message: Message) -> str:
  891. """
  892. Get mime type of document in message
  893. :param message: Message with document
  894. :return: Mime type or empty string if not present
  895. """
  896. return (
  897. ""
  898. if not isinstance(message, Message) or not getattr(message, "media", False)
  899. else getattr(getattr(message, "media", False), "mime_type", False) or ""
  900. )
  901. def find_caller(stack: Optional[List[inspect.FrameInfo]] = None) -> Any:
  902. """Attempts to find command in stack"""
  903. caller = next(
  904. (
  905. frame_info
  906. for frame_info in stack or inspect.stack()
  907. if hasattr(frame_info, "function")
  908. and any(
  909. inspect.isclass(cls_)
  910. and issubclass(cls_, Module)
  911. and cls_ is not Module
  912. for cls_ in frame_info.frame.f_globals.values()
  913. )
  914. ),
  915. None,
  916. )
  917. if not caller:
  918. return next(
  919. (
  920. frame_info.frame.f_locals["func"]
  921. for frame_info in stack or inspect.stack()
  922. if hasattr(frame_info, "function")
  923. and frame_info.function == "future_dispatcher"
  924. and (
  925. "CommandDispatcher"
  926. in getattr(getattr(frame_info, "frame", None), "f_globals", {})
  927. )
  928. ),
  929. None,
  930. )
  931. return next(
  932. (
  933. getattr(cls_, caller.function, None)
  934. for cls_ in caller.frame.f_globals.values()
  935. if inspect.isclass(cls_) and issubclass(cls_, Module)
  936. ),
  937. None,
  938. )
  939. def validate_html(html: str) -> str:
  940. """Removes broken tags from html"""
  941. text, entities = telethon.extensions.html.parse(html)
  942. return telethon.extensions.html.unparse(escape_html(text), entities)
  943. init_ts = time.perf_counter()
  944. # GeekTG Compatibility
  945. def get_git_info():
  946. # https://github.com/GeekTG/Friendly-Telegram/blob/master/friendly-telegram/utils.py#L133
  947. try:
  948. repo = git.Repo()
  949. ver = repo.heads[0].commit.hexsha
  950. except Exception:
  951. ver = ""
  952. return [
  953. ver,
  954. f"https://github.com/hikariatama/Hikka/commit/{ver}" if ver else "",
  955. ]
  956. def get_version_raw():
  957. """Get the version of the userbot"""
  958. # https://github.com/GeekTG/Friendly-Telegram/blob/master/friendly-telegram/utils.py#L128
  959. from . import version
  960. return ".".join(list(map(str, list(version.__version__))))
  961. get_platform_name = get_named_platform