123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644 |
- # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
- # █▀█ █ █ █ █▀█ █▀▄ █
- # © Copyright 2022
- # https://t.me/hikariatama
- #
- # 🔒 Licensed under the GNU AGPLv3
- # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import contextlib
- import copy
- import functools
- import logging
- import time
- import traceback
- from typing import List, Optional, Union
- from aiogram.types import (
- CallbackQuery,
- InlineKeyboardMarkup,
- InlineQuery,
- InlineQueryResultGif,
- InlineQueryResultPhoto,
- InputMediaAnimation,
- InputMediaPhoto,
- )
- from aiogram.utils.exceptions import BadRequest, InvalidHTTPUrlContent, RetryAfter
- from telethon.tl.types import Message
- from telethon.errors.rpcerrorlist import ChatSendInlineForbiddenError
- from urllib.parse import urlparse
- import os
- from .. import utils, main
- from .types import InlineUnit, InlineMessage
- logger = logging.getLogger(__name__)
- class ListGalleryHelper:
- def __init__(self, lst: List[str]):
- self.lst = lst
- self._current_index = -1
- def __call__(self):
- self._current_index += 1
- return self.lst[self._current_index % len(self.lst)]
- def by_index(self, index: int):
- return self.lst[index % len(self.lst)]
- class Gallery(InlineUnit):
- async def gallery(
- self,
- message: Union[Message, int],
- next_handler: Union[callable, List[str]],
- caption: Optional[Union[List[str], str, callable]] = "",
- *,
- custom_buttons: Optional[Union[List[List[dict]], List[dict], dict]] = None,
- force_me: Optional[bool] = False,
- always_allow: Optional[list] = None,
- manual_security: Optional[bool] = False,
- disable_security: Optional[bool] = False,
- ttl: Optional[Union[int, bool]] = False,
- on_unload: Optional[callable] = None,
- preload: Optional[Union[bool, int]] = False,
- gif: Optional[bool] = False,
- silent: Optional[bool] = False,
- _reattempt: bool = False,
- ) -> Union[bool, InlineMessage]:
- """
- Send inline gallery to chat
- :param caption: Caption for photo, or callable, returning caption
- :param message: Where to send inline. Can be either `Message` or `int`
- :param next_handler: Callback function, which must return url for next photo or list with photo urls
- :param custom_buttons: Custom buttons to add above native ones
- :param force_me: Either this gallery buttons must be pressed only by owner scope or no
- :param always_allow: Users, that are allowed to press buttons in addition to previous rules
- :param ttl: Time, when the gallery is going to be unloaded. Unload means, that the gallery
- will become unusable. Pay attention, that ttl can't
- be bigger, than default one (1 day) and must be either `int` or `False`
- :param on_unload: Callback, called when gallery is unloaded and/or closed. You can clean up trash
- or perform another needed action
- :param preload: Either to preload gallery photos beforehand or no. If yes - specify threshold to
- be loaded. Toggle this attribute, if your callback is too slow to load photos
- in real time
- :param gif: Whether the gallery will be filled with gifs. If you omit this argument and specify
- gifs in `next_handler`, Hikka will try to determine the filetype of these images
- :param manual_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
- If you want to avoid this, pass `manual_security=True`
- :param disable_security: By default, Hikka will try to inherit inline buttons security from the caller (command)
- If you want to disable all security checks on this gallery in particular, pass `disable_security=True`
- :param silent: Whether the gallery must be sent silently (w/o "Loading inline gallery..." message)
- :return: If gallery is sent, returns :obj:`InlineMessage`, otherwise returns `False`
- """
- with contextlib.suppress(AttributeError):
- _hikka_client_id_logging_tag = copy.copy(self._client.tg_id)
- custom_buttons = self._validate_markup(custom_buttons)
- if not (
- isinstance(caption, str)
- or isinstance(caption, list)
- and all(isinstance(item, str) for item in caption)
- ) and not callable(caption):
- logger.error("Invalid type for `caption`")
- return False
- if isinstance(caption, list):
- caption = ListGalleryHelper(caption)
- if not isinstance(manual_security, bool):
- logger.error("Invalid type for `manual_security`")
- return False
- if not isinstance(silent, bool):
- logger.error("Invalid type for `silent`")
- return False
- if not isinstance(disable_security, bool):
- logger.error("Invalid type for `disable_security`")
- return False
- if not isinstance(message, (Message, int)):
- logger.error("Invalid type for `message`")
- return False
- if not isinstance(force_me, bool):
- logger.error("Invalid type for `force_me`")
- return False
- if not isinstance(gif, bool):
- logger.error("Invalid type for `gif`")
- return False
- if (
- not isinstance(preload, (bool, int))
- or isinstance(preload, bool)
- and preload
- ):
- logger.error("Invalid type for `preload`")
- return False
- if always_allow and not isinstance(always_allow, list):
- logger.error("Invalid type for `always_allow`")
- return False
- if not always_allow:
- always_allow = []
- if not isinstance(ttl, int) and ttl:
- logger.error("Invalid type for `ttl`")
- return False
- if isinstance(next_handler, list):
- if all(isinstance(i, str) for i in next_handler):
- next_handler = ListGalleryHelper(next_handler)
- else:
- logger.error("Invalid type for `next_handler`")
- return False
- unit_id = utils.rand(16)
- btn_call_data = utils.rand(10)
- try:
- if isinstance(next_handler, ListGalleryHelper):
- photo_url = next_handler.lst
- else:
- photo_url = await self._call_photo(next_handler)
- if not photo_url:
- return False
- except Exception:
- logger.exception("Error while parsing first photo in gallery")
- return False
- perms_map = None if manual_security else self._find_caller_sec_map()
- self._units[unit_id] = {
- "type": "gallery",
- "caption": caption,
- "chat": None,
- "message_id": None,
- "uid": unit_id,
- "photo_url": (photo_url if isinstance(photo_url, str) else photo_url[0]),
- "next_handler": next_handler,
- "btn_call_data": btn_call_data,
- "photos": [photo_url] if isinstance(photo_url, str) else photo_url,
- "current_index": 0,
- "future": asyncio.Event(),
- **({"ttl": round(time.time()) + ttl} if ttl else {}),
- **({"force_me": force_me} if force_me else {}),
- **({"disable_security": disable_security} if disable_security else {}),
- **({"on_unload": on_unload} if callable(on_unload) else {}),
- **({"preload": preload} if preload else {}),
- **({"gif": gif} if gif else {}),
- **({"always_allow": always_allow} if always_allow else {}),
- **({"perms_map": perms_map} if perms_map else {}),
- **({"message": message} if isinstance(message, Message) else {}),
- **({"custom_buttons": custom_buttons} if custom_buttons else {}),
- }
- self._custom_map[btn_call_data] = {
- "handler": asyncio.coroutine(
- functools.partial(
- self._gallery_page,
- unit_id=unit_id,
- )
- ),
- **(
- {"ttl": self._units[unit_id]["ttl"]}
- if "ttl" in self._units[unit_id]
- else {}
- ),
- **({"always_allow": always_allow} if always_allow else {}),
- **({"force_me": force_me} if force_me else {}),
- **({"disable_security": disable_security} if disable_security else {}),
- **({"perms_map": perms_map} if perms_map else {}),
- **({"message": message} if isinstance(message, Message) else {}),
- }
- if isinstance(message, Message) and not silent:
- try:
- status_message = await (
- message.edit if message.out else message.respond
- )("🌘 <b>Loading inline gallery...</b>")
- except Exception:
- status_message = None
- else:
- status_message = None
- async def answer(msg: str):
- nonlocal message
- if isinstance(message, Message):
- await (message.edit if message.out else message.respond)(msg)
- else:
- await self._client.send_message(message, msg)
- try:
- q = await self._client.inline_query(self.bot_username, unit_id)
- m = await q[0].click(
- utils.get_chat_id(message) if isinstance(message, Message) else message,
- reply_to=message.reply_to_msg_id
- if isinstance(message, Message)
- else None,
- )
- except ChatSendInlineForbiddenError:
- await answer("🚫 <b>You can't send inline units in this chat</b>")
- except Exception:
- logger.exception("Error sending inline gallery")
- del self._units[unit_id]
- if _reattempt:
- logger.exception("Can't send gallery")
- if not self._db.get(main.__name__, "inlinelogs", True):
- msg = "<b>🚫 Gallery invoke failed! More info in logs</b>"
- else:
- exc = traceback.format_exc()
- # Remove `Traceback (most recent call last):`
- exc = "\n".join(exc.splitlines()[1:])
- msg = (
- "<b>🚫 Gallery invoke failed!</b>\n\n"
- f"<b>🧾 Logs:</b>\n<code>{utils.escape_html(exc)}</code>"
- )
- del self._units[unit_id]
- await answer(msg)
- return False
- kwargs = utils.get_kwargs()
- kwargs["_reattempt"] = True
- return await self.gallery(**kwargs)
- await self._units[unit_id]["future"].wait()
- del self._units[unit_id]["future"]
- self._units[unit_id]["chat"] = utils.get_chat_id(m)
- self._units[unit_id]["message_id"] = m.id
- if isinstance(message, Message) and message.out:
- await message.delete()
- if status_message and not message.out:
- await status_message.delete()
- if not isinstance(next_handler, ListGalleryHelper):
- asyncio.ensure_future(self._load_gallery_photos(unit_id))
- return InlineMessage(self, unit_id, self._units[unit_id]["inline_message_id"])
- async def _call_photo(self, callback: callable) -> Union[str, bool]:
- """Parses photo url from `callback`. Returns url on success, otherwise `False`"""
- if isinstance(callback, str):
- photo_url = callback
- elif isinstance(callback, list):
- photo_url = callback[0]
- elif asyncio.iscoroutinefunction(callback):
- photo_url = await callback()
- elif callable(callback):
- photo_url = callback()
- else:
- logger.error("Invalid type for `next_handler`")
- return False
- if not isinstance(photo_url, (str, list)):
- logger.error("Got invalid result from `next_handler`")
- return False
- return photo_url
- async def _load_gallery_photos(self, unit_id: str):
- """Preloads photo. Should be called via ensure_future"""
- unit = self._units[unit_id]
- photo_url = await self._call_photo(unit["next_handler"])
- self._units[unit_id]["photos"] += (
- [photo_url] if isinstance(photo_url, str) else photo_url
- )
- unit = self._units[unit_id]
- # If only one preload was insufficient to load needed amount of photos
- if unit.get("preload", False) and len(unit["photos"]) - unit[
- "current_index"
- ] < unit.get("preload", False):
- # Start load again
- asyncio.ensure_future(self._load_gallery_photos(unit_id))
- async def _gallery_slideshow_loop(
- self,
- call: CallbackQuery,
- unit_id: str = None,
- ):
- while True:
- await asyncio.sleep(7)
- unit = self._units[unit_id]
- if unit_id not in self._units or not unit.get("slideshow", False):
- return
- if unit["current_index"] + 1 >= len(unit["photos"]) and isinstance(
- unit["next_handler"],
- ListGalleryHelper,
- ):
- del self._units[unit_id]["slideshow"]
- self._units[unit_id]["current_index"] -= 1
- await self._gallery_page(
- call,
- self._units[unit_id]["current_index"] + 1,
- unit_id=unit_id,
- )
- async def _gallery_slideshow(
- self,
- call: CallbackQuery,
- unit_id: str = None,
- ):
- if not self._units[unit_id].get("slideshow", False):
- self._units[unit_id]["slideshow"] = True
- await self.bot.edit_message_reply_markup(
- inline_message_id=call.inline_message_id,
- reply_markup=self._gallery_markup(unit_id),
- )
- await call.answer("✅ Slideshow on")
- else:
- del self._units[unit_id]["slideshow"]
- await self.bot.edit_message_reply_markup(
- inline_message_id=call.inline_message_id,
- reply_markup=self._gallery_markup(unit_id),
- )
- await call.answer("🚫 Slideshow off")
- return
- asyncio.ensure_future(
- self._gallery_slideshow_loop(
- call,
- unit_id,
- )
- )
- async def _gallery_back(
- self,
- call: CallbackQuery,
- unit_id: str = None,
- ):
- queue = self._units[unit_id]["photos"]
- if not queue:
- await call.answer("No way back", show_alert=True)
- return
- self._units[unit_id]["current_index"] -= 1
- if self._units[unit_id]["current_index"] < 0:
- self._units[unit_id]["current_index"] = 0
- await call.answer("No way back")
- return
- try:
- await self.bot.edit_message_media(
- inline_message_id=call.inline_message_id,
- media=self._get_current_media(unit_id),
- reply_markup=self._gallery_markup(unit_id),
- )
- except RetryAfter as e:
- await call.answer(
- f"Got FloodWait. Wait for {e.timeout} seconds",
- show_alert=True,
- )
- except Exception:
- logger.exception("Exception while trying to edit media")
- await call.answer("Error occurred", show_alert=True)
- return
- def _get_current_media(
- self,
- unit_id: str,
- ) -> Union[InputMediaPhoto, InputMediaAnimation]:
- """Return current media, which should be updated in gallery"""
- media = self._get_next_photo(unit_id)
- try:
- path = urlparse(media).path
- ext = os.path.splitext(path)[1]
- except Exception:
- ext = None
- if self._units[unit_id].get("gif", False) or ext in {".gif", ".mp4"}:
- return InputMediaAnimation(
- media=media,
- caption=self._get_caption(
- unit_id,
- index=self._units[unit_id]["current_index"],
- ),
- parse_mode="HTML",
- )
- return InputMediaPhoto(
- media=media,
- caption=self._get_caption(
- unit_id,
- index=self._units[unit_id]["current_index"],
- ),
- parse_mode="HTML",
- )
- async def _gallery_page(
- self,
- call: CallbackQuery,
- page: Union[int, str],
- unit_id: str = None,
- ):
- if page == "slideshow":
- await self._gallery_slideshow(call, unit_id)
- return
- if page == "close":
- await self._delete_unit_message(call, unit_id=unit_id)
- return
- if page < 0:
- await call.answer("No way back")
- return
- if page > len(self._units[unit_id]["photos"]) - 1 and isinstance(
- self._units[unit_id]["next_handler"], ListGalleryHelper
- ):
- await call.answer("No way forward")
- return
- self._units[unit_id]["current_index"] = page
- if not isinstance(self._units[unit_id]["next_handler"], ListGalleryHelper):
- # If we exceeded photos limit in gallery and need to preload more
- if self._units[unit_id]["current_index"] >= len(
- self._units[unit_id]["photos"]
- ):
- await self._load_gallery_photos(unit_id)
- # If we still didn't get needed photo index
- if self._units[unit_id]["current_index"] >= len(
- self._units[unit_id]["photos"]
- ):
- await call.answer("Can't load next photo")
- return
- if (
- len(self._units[unit_id]["photos"])
- - self._units[unit_id]["current_index"]
- < self._units[unit_id].get("preload", 0) // 2
- ):
- logger.debug(f"Started preload for gallery {unit_id}")
- asyncio.ensure_future(self._load_gallery_photos(unit_id))
- try:
- await self.bot.edit_message_media(
- inline_message_id=call.inline_message_id,
- media=self._get_current_media(unit_id),
- reply_markup=self._gallery_markup(unit_id),
- )
- except (InvalidHTTPUrlContent, BadRequest):
- logger.debug("Error fetching photo content, attempting load next one")
- del self._units[unit_id]["photos"][self._units[unit_id]["current_index"]]
- self._units[unit_id]["current_index"] -= 1
- return await self._gallery_page(call, page, unit_id)
- except RetryAfter as e:
- await call.answer(
- f"Got FloodWait. Wait for {e.timeout} seconds",
- show_alert=True,
- )
- return
- except Exception:
- logger.exception("Exception while trying to edit media")
- await call.answer("Error occurred", show_alert=True)
- return
- def _get_next_photo(self, unit_id: str) -> str:
- """Returns next photo"""
- try:
- return self._units[unit_id]["photos"][self._units[unit_id]["current_index"]]
- except IndexError:
- logger.error(
- "Got IndexError in `_get_next_photo`. "
- f"{self._units[unit_id]['current_index']=} / "
- f"{len(self._units[unit_id]['photos'])=}"
- )
- return self._units[unit_id]["photos"][0]
- def _get_caption(self, unit_id: str, index: int = 0) -> str:
- """Calls and returnes caption for gallery"""
- caption = self._units[unit_id].get("caption", "")
- if isinstance(caption, ListGalleryHelper):
- return caption.by_index(index)
- return (
- caption
- if isinstance(caption, str)
- else caption()
- if callable(caption)
- else ""
- )
- def _gallery_markup(self, unit_id: str) -> InlineKeyboardMarkup:
- """Generates aiogram markup for `gallery`"""
- callback = functools.partial(self._gallery_page, unit_id=unit_id)
- unit = self._units[unit_id]
- return self.generate_markup(
- (
- (
- unit.get("custom_buttons", [])
- + self.build_pagination(
- unit_id=unit_id,
- callback=callback,
- total_pages=len(unit["photos"]),
- )
- + [
- [
- *(
- [
- {
- "text": "⏪",
- "callback": callback,
- "args": (unit["current_index"] - 1,),
- }
- ]
- if unit["current_index"] > 0
- else []
- ),
- *(
- [
- {
- "text": "🛑"
- if unit.get("slideshow", False)
- else "⏱",
- "callback": callback,
- "args": ("slideshow",),
- }
- ]
- if unit["current_index"] < len(unit["photos"]) - 1
- or not isinstance(
- unit["next_handler"], ListGalleryHelper
- )
- else []
- ),
- *(
- [
- {
- "text": "⏩",
- "callback": callback,
- "args": (unit["current_index"] + 1,),
- }
- ]
- if unit["current_index"] < len(unit["photos"]) - 1
- or not isinstance(
- unit["next_handler"], ListGalleryHelper
- )
- else []
- ),
- ]
- ]
- )
- + [[{"text": "🔻 Close", "callback": callback, "args": ("close",)}]]
- )
- )
- async def _gallery_inline_handler(self, inline_query: InlineQuery):
- for unit in self._units.copy().values():
- if (
- inline_query.from_user.id == self._me
- and inline_query.query == unit["uid"]
- and unit["type"] == "gallery"
- ):
- try:
- path = urlparse(unit["photo_url"]).path
- ext = os.path.splitext(path)[1]
- except Exception:
- ext = None
- args = {
- "thumb_url": "https://img.icons8.com/fluency/344/loading.png",
- "caption": self._get_caption(unit["uid"], index=0),
- "parse_mode": "HTML",
- "reply_markup": self._gallery_markup(unit["uid"]),
- "id": utils.rand(20),
- "title": "Processing inline gallery",
- }
- if unit.get("gif", False) or ext in {".gif", ".mp4"}:
- await inline_query.answer(
- [InlineQueryResultGif(gif_url=unit["photo_url"], **args)]
- )
- return
- await inline_query.answer(
- [InlineQueryResultPhoto(photo_url=unit["photo_url"], **args)],
- cache_time=0,
- )
|