123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
- # █▀█ █ █ █ █▀█ █▀▄ █
- # © Copyright 2022
- # https://t.me/hikariatama
- #
- # 🔒 Licensed under the GNU AGPLv3
- # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
- # meta pic: https://img.icons8.com/emoji/344/shield-emoji.png
- # meta developer: @hikariatama
- import asyncio
- import io
- import json
- import logging
- import time
- from telethon.tl.types import Message
- from .. import loader, utils
- from ..inline.types import InlineCall
- logger = logging.getLogger(__name__)
- @loader.tds
- class APIRatelimiterMod(loader.Module):
- """Helps userbot avoid spamming Telegram API"""
- strings = {
- "name": "APIRatelimiter",
- "warning": (
- "🚫 <b>WARNING!</b>\n\nYour account exceeded the limit of requests,"
- " specified in config. In order to prevent Telegram API Flood, userbot has"
- " been <b>fully frozen</b> for {} seconds. Further info is provided in"
- " attached file. \n\nIt is recommended to get help in"
- " <code>{prefix}support</code> group!\n\nIf you think, that it is an"
- " intended behavior, then wait until userbot gets unlocked and next time,"
- " when you will be going to perform such an operation, use"
- " <code>{prefix}suspend_api_protect</code> <time in seconds>"
- ),
- "args_invalid": "🚫 <b>Invalid arguments</b>",
- "suspended_for": "✅ <b>API Flood Protection is disabled for {} seconds</b>",
- "test": (
- "⚠️ <b>This action will expose your account to flooding Telegram API.</b>"
- " <i>In order to confirm, that you really know, what you are doing,"
- " complete this simple test - find the emoji, differing from others</i>"
- ),
- "on": "✅ <b>Protection enabled</b>",
- "off": "🚫 <b>Protection disabled</b>",
- "u_sure": "⚠️ <b>Are you sure?</b>",
- }
- strings_ru = {
- "warning": (
- "🚫 <b>ВНИМАНИЕ!</b>\n\nАккаунт вышел за лимиты запросов, указанные в"
- " конфиге. С целью предотвращения флуда Telegram API, юзербот был"
- " <b>полностью заморожен</b> на {} секунд. Дополнительная информация"
- " прикреплена в файле ниже. \n\nРекомендуется обратиться за помощью в"
- " <code>{prefix}support</code> группу!\n\nЕсли ты считаешь, что это"
- " запланированное поведение юзербота, просто подожди, пока закончится"
- " таймер и в следующий раз, когда запланируешь выполнять такую"
- " ресурсозатратную операцию, используй"
- " <code>{prefix}suspend_api_protect</code> <время в секундах>"
- ),
- "args_invalid": "🚫 <b>Неверные аргументы</b>",
- "suspended_for": "✅ <b>Защита API отключена на {} секунд</b>",
- "test": (
- "⚠️ <b>Это действие открывает юзерботу возможность флудить Telegram"
- " API.</b> <i>Для того, чтобы убедиться, что ты действительно уверен в том,"
- " что делаешь - реши простенький тест - найди отличающийся эмодзи.</i>"
- ),
- "on": "✅ <b>Защита включена</b>",
- "off": "🚫 <b>Защита отключена</b>",
- "u_sure": "⚠️ <b>Ты уверен?</b>",
- }
- _ratelimiter = []
- _suspend_until = 0
- _lock = False
- def __init__(self):
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "time_sample",
- 15,
- lambda: "Time sample DO NOT TOUCH",
- validator=loader.validators.Integer(minimum=1),
- ),
- loader.ConfigValue(
- "threshold",
- 100,
- lambda: "Threshold DO NOT TOUCH",
- validator=loader.validators.Integer(minimum=10),
- ),
- loader.ConfigValue(
- "local_floodwait",
- 30,
- lambda: "Local FW DO NOT TOUCH",
- validator=loader.validators.Integer(minimum=10, maximum=3600),
- ),
- )
- async def client_ready(self, *_):
- asyncio.ensure_future(self._install_protection())
- async def _install_protection(self):
- await asyncio.sleep(30) # Restart lock
- if hasattr(self._client._call, "_old_call_rewritten"):
- raise loader.SelfUnload("Already installed")
- old_call = self._client._call
- async def new_call(
- sender: "MTProtoSender", # type: ignore
- request: "TLRequest", # type: ignore
- ordered: bool = False,
- flood_sleep_threshold: int = None,
- ):
- if time.perf_counter() > self._suspend_until and not self.get(
- "disable_protection",
- True,
- ):
- request_name = type(request).__name__
- self._ratelimiter += [[request_name, time.perf_counter()]]
- self._ratelimiter = list(
- filter(
- lambda x: time.perf_counter() - x[1]
- < int(self.config["time_sample"]),
- self._ratelimiter,
- )
- )
- if (
- len(self._ratelimiter) > int(self.config["threshold"])
- and not self._lock
- ):
- self._lock = True
- report = io.BytesIO(
- json.dumps(
- self._ratelimiter,
- indent=4,
- ).encode("utf-8")
- )
- report.name = "local_fw_report.json"
- await self.inline.bot.send_document(
- self.tg_id,
- report,
- caption=self.strings("warning").format(
- self.config["local_floodwait"],
- prefix=self.get_prefix(),
- ),
- )
- # It is intented to use time.sleep instead of asyncio.sleep
- time.sleep(int(self.config["local_floodwait"]))
- self._lock = False
- return await old_call(sender, request, ordered, flood_sleep_threshold)
- self._client._call = new_call
- self._client._old_call_rewritten = old_call
- self._client._call._hikka_overwritten = True
- logger.debug("Successfully installed ratelimiter")
- async def on_unload(self):
- if hasattr(self._client, "_old_call_rewritten"):
- self._client._call = self._client._old_call_rewritten
- delattr(self._client, "_old_call_rewritten")
- logger.debug("Successfully uninstalled ratelimiter")
- async def suspend_api_protectcmd(self, message: Message):
- """<time in seconds> - Suspend API Ratelimiter for n seconds"""
- args = utils.get_args_raw(message)
- if not args or not args.isdigit():
- await utils.answer(message, self.strings("args_invalid"))
- return
- self._suspend_until = time.perf_counter() + int(args)
- await utils.answer(message, self.strings("suspended_for").format(args))
- async def api_fw_protectioncmd(self, message: Message):
- """Only for people, who know what they're doing"""
- await self.inline.form(
- message=message,
- text=self.strings("u_sure"),
- reply_markup=[
- {"text": "🚫 No", "action": "close"},
- {"text": "✅ Yes", "callback": self._finish},
- ],
- )
- async def _finish(self, call: InlineCall):
- state = self.get("disable_protection", True)
- self.set("disable_protection", not state)
- await call.edit(self.strings("on" if state else "off"))
|