123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # ©️ Dan Gazizullin, 2021-2023
- # This file is a part of Hikka Userbot
- # 🌐 https://github.com/hikariatama/Hikka
- # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
- # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
- import asyncio
- import contextlib
- import git
- from .. import loader, utils, version
- from ..inline.types import InlineCall
- @loader.tds
- class UpdateNotifier(loader.Module):
- """Tracks latest Hikka releases, and notifies you, if update is required"""
- strings = {"name": "UpdateNotifier"}
- def __init__(self):
- self._notified = None
- self.config = loader.ModuleConfig(
- loader.ConfigValue(
- "disable_notifications",
- doc=lambda: self.strings("_cfg_doc_disable_notifications"),
- validator=loader.validators.Boolean(),
- )
- )
- def get_changelog(self) -> str:
- try:
- repo = git.Repo()
- for remote in repo.remotes:
- remote.fetch()
- if not (
- diff := repo.git.log([f"HEAD..origin/{version.branch}", "--oneline"])
- ):
- return False
- except Exception:
- return False
- res = "\n".join(
- f"<b>{commit.split()[0]}</b>:"
- f" <i>{utils.escape_html(' '.join(commit.split()[1:]))}</i>"
- for commit in diff.splitlines()[:10]
- )
- if diff.count("\n") >= 10:
- res += self.strings("more").format(len(diff.splitlines()) - 10)
- return res
- def get_latest(self) -> str:
- try:
- return next(
- git.Repo().iter_commits(f"origin/{version.branch}", max_count=1)
- ).hexsha
- except Exception:
- return ""
- async def client_ready(self):
- try:
- git.Repo()
- except Exception as e:
- raise loader.LoadError("Can't load due to repo init error") from e
- self._markup = lambda: self.inline.generate_markup(
- [
- {"text": self.strings("update"), "data": "hikka/update"},
- {"text": self.strings("ignore"), "data": "hikka/ignore_upd"},
- ]
- )
- @loader.loop(interval=60, autostart=True)
- async def poller(self):
- if self.config["disable_notifications"] or not self.get_changelog():
- return
- self._pending = self.get_latest()
- if (
- self.get("ignore_permanent", False)
- and self.get("ignore_permanent") == self._pending
- ):
- await asyncio.sleep(60)
- return
- if self._pending not in {utils.get_git_hash(), self._notified}:
- m = await self.inline.bot.send_animation(
- self.tg_id,
- "https://t.me/hikari_assets/71",
- caption=self.strings("update_required").format(
- utils.get_git_hash()[:6],
- '<a href="https://github.com/hikariatama/Hikka/compare/{}...{}">{}</a>'
- .format(
- utils.get_git_hash()[:12],
- self.get_latest()[:12],
- self.get_latest()[:6],
- ),
- self.get_changelog(),
- ),
- reply_markup=self._markup(),
- )
- self._notified = self._pending
- self.set("ignore_permanent", False)
- await self._delete_all_upd_messages()
- self.set("upd_msg", m.message_id)
- async def _delete_all_upd_messages(self):
- for client in self.allclients:
- with contextlib.suppress(Exception):
- await client.loader.inline.bot.delete_message(
- client.tg_id,
- client.loader.db.get("UpdateNotifier", "upd_msg"),
- )
- @loader.callback_handler()
- async def update(self, call: InlineCall):
- """Process update buttons clicks"""
- if call.data not in {"hikka/update", "hikka/ignore_upd"}:
- return
- if call.data == "hikka/ignore_upd":
- self.set("ignore_permanent", self.get_latest())
- await call.answer(self.strings("latest_disabled"))
- return
- await self._delete_all_upd_messages()
- with contextlib.suppress(Exception):
- await call.delete()
- await self.invoke("update", "-f", peer=self.inline.bot_username)
|