update_notifier.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  2. # █▀█ █ █ █ █▀█ █▀▄ █
  3. # © Copyright 2022
  4. # https://t.me/hikariatama
  5. #
  6. # 🔒 Licensed under the GNU AGPLv3
  7. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  8. # scope: inline
  9. import asyncio
  10. import contextlib
  11. import logging
  12. from typing import Union
  13. import git
  14. from .. import loader, utils
  15. from ..inline.types import InlineCall
  16. logger = logging.getLogger(__name__)
  17. @loader.tds
  18. class UpdateNotifierMod(loader.Module):
  19. """Tracks latest Hikka releases, and notifies you, if update is required"""
  20. strings = {
  21. "name": "UpdateNotifier",
  22. "update_required": (
  23. "🌘 <b>Hikka Update available!</b>\n\nNew Hikka version released.\n🔮"
  24. " <b>Hikka <s>{}</s> -> {}</b>\n\n{}"
  25. ),
  26. "more": "\n<i><b>🎥 And {} more...</b></i>",
  27. }
  28. strings_ru = {
  29. "update_required": (
  30. "🌘 <b>Доступно обновление Hikka!</b>\n\nОпубликована новая версия Hikka.\n🔮"
  31. " <b>Hikka <s>{}</s> -> {}</b>\n\n{}"
  32. ),
  33. "more": "\n<i><b>🎥 И еще {}...</b></i>",
  34. }
  35. _notified = None
  36. def __init__(self):
  37. self.config = loader.ModuleConfig(
  38. loader.ConfigValue(
  39. "disable_notifications",
  40. doc=lambda: "Disable update notifications",
  41. validator=loader.validators.Boolean(),
  42. )
  43. )
  44. def get_commit(self) -> Union[str, bool]:
  45. try:
  46. return git.Repo().heads[0].commit.hexsha
  47. except Exception:
  48. return False
  49. def get_changelog(self) -> str:
  50. try:
  51. repo = git.Repo()
  52. for remote in repo.remotes:
  53. remote.fetch()
  54. if not (diff := repo.git.log(["HEAD..origin/master", "--oneline"])):
  55. return False
  56. except Exception:
  57. return False
  58. res = "\n".join(
  59. f"<b>{commit.split()[0]}</b>:"
  60. f" <i>{utils.escape_html(' '.join(commit.split()[1:]))}</i>"
  61. for commit in diff.splitlines()[:10]
  62. )
  63. if diff.count("\n") >= 10:
  64. res += self.strings("more").format(len(diff.splitlines()) - 10)
  65. return res
  66. def get_latest(self) -> str:
  67. try:
  68. return list(git.Repo().iter_commits("origin/master", max_count=1))[0].hexsha
  69. except Exception:
  70. return ""
  71. async def client_ready(self, *_):
  72. try:
  73. git.Repo()
  74. except Exception as e:
  75. raise loader.LoadError("Can't load due to repo init error") from e
  76. self._markup = self.inline.generate_markup(
  77. [
  78. {"text": "🔄 Update", "data": "hikka_update"},
  79. {"text": "🚫 Ignore", "data": "hikka_upd_ignore"},
  80. ]
  81. )
  82. self.poller.start()
  83. @loader.loop(interval=60)
  84. async def poller(self):
  85. if self.config["disable_notifications"] or not self.get_changelog():
  86. return
  87. self._pending = self.get_latest()
  88. if (
  89. self.get("ignore_permanent", False)
  90. and self.get("ignore_permanent") == self._pending
  91. ):
  92. await asyncio.sleep(60)
  93. return
  94. if self._pending not in [self.get_commit(), self._notified]:
  95. m = await self.inline.bot.send_message(
  96. self.tg_id,
  97. self.strings("update_required").format(
  98. self.get_commit()[:6],
  99. f'<a href="https://github.com/hikariatama/Hikka/compare/{self.get_commit()[:12]}...{self.get_latest()[:12]}">{self.get_latest()[:6]}</a>',
  100. self.get_changelog(),
  101. ),
  102. disable_web_page_preview=True,
  103. reply_markup=self._markup,
  104. )
  105. self._notified = self._pending
  106. self.set("ignore_permanent", False)
  107. await self._delete_all_upd_messages()
  108. self.set("upd_msg", m.message_id)
  109. async def _delete_all_upd_messages(self):
  110. for client in self.allclients:
  111. with contextlib.suppress(Exception):
  112. await client.loader.inline.bot.delete_message(
  113. client.tg_id,
  114. client.loader._db.get("UpdateNotifierMod", "upd_msg"),
  115. )
  116. async def update_callback_handler(self, call: InlineCall):
  117. """Process update buttons clicks"""
  118. if call.data not in {"hikka_update", "hikka_upd_ignore"}:
  119. return
  120. if call.data == "hikka_upd_ignore":
  121. self.set("ignore_permanent", self._pending)
  122. await call.answer("Notifications about this update have been suppressed")
  123. return
  124. await self._delete_all_upd_messages()
  125. with contextlib.suppress(Exception):
  126. await call.delete()
  127. await self.allmodules.commands["update"](
  128. await self._client.send_message(
  129. self.inline.bot_username,
  130. f"<code>{self.get_prefix()}update --force</code>",
  131. )
  132. )