123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #!/usr/bin/env python3
- # - built in - #
- import sys
- import json
- import asyncio
- import logging
- from pathlib import Path
- from typing import Optional
- from datetime import datetime
- from dataclasses import dataclass
- # - pypi - #
- from telethon import hints # type: ignore
- from telethon import TelegramClient
- from telethon.tl.types import ( # type: ignore
- Message,
- PeerChannel,
- ChannelParticipantsAdmins as AdmFilter
- )
- # - local -#
- import config
- import invidious
- from database import Database
- # logging.basicConfig(level='DEBUG')
- @dataclass
- class YTNotifyData:
- db: Database
- bot: TelegramClient
- api: invidious.Invidious
- chat: PeerChannel
- delay: int
- channel: str
- _channel_name: Optional[str] = None
- class YTNotify(YTNotifyData):
- _NAME = 'YTNotify'
- # - log - #
- __handler = logging.StreamHandler()
- __handler.setLevel(config.LOGLEVEL)
- __handler.setFormatter(logging.Formatter(config.LOGFMT, config.TIMEFMT))
- _log = logging.getLogger(_NAME)
- _log.addHandler(__handler)
- _log.setLevel(config.LOGLEVEL)
- @property
- def channel_name(self) -> str:
- return self._channel_name or self.channel
- @staticmethod
- def is_live(info: invidious.VideoInfo) -> bool:
- return info.lengthSeconds == 0
- @staticmethod
- def utc_now() -> int:
- return int(datetime.utcnow().timestamp())
- async def _send_message(self, text: str,
- chat: hints.EntityLike = None) -> Message:
- if not chat:
- chat = await self.bot.get_entity(self.chat)
- return await self.bot.send_message(chat, text)
- async def _get_last_live(self) -> Optional[str]:
- return await self.db.get_last_live(self.channel)
- async def _update_last_live(self, video_id: str) -> int:
- return await self.db.update_last_live(self.channel, video_id)
- async def _warn_admins(self, attempts: int) -> None:
- text = ('cannot get latest video from '
- '[%s](https://youtube.com/channel/%s)\n\n'
- 'attempts: %d' % (self.channel_name, self.channel, attempts))
- chat = await self.bot.get_entity(self.chat)
- async for adm in self.bot.iter_participants(chat, filter=AdmFilter):
- if not adm.bot:
- await self._send_message(text, chat=adm)
- async def _video_handler(self, video: invidious.VideoInfo) -> None:
- last_id = await self._get_last_live()
- if video == invidious.NO_VIDEOS:
- self._log.debug(
- '%s: looks like there are no videos for', self.channel)
- return
- if self.is_live(video):
- if last_id == video.videoId:
- return
- self._log.info('%s: new live found: %s',
- self.channel, video.videoId)
- await asyncio.wait([
- self._update_last_live(video.videoId),
- self._send_message('https://youtu.be/%s' % video.videoId)
- ])
- elif not last_id:
- self._log.debug('%s: last stream id not found in db, '
- 'adding 0.' % self.channel)
- await self._update_last_live('0')
- async def run(self) -> None:
- attempts: int = int()
- refreshed: bool = False
- while True:
- info = await self.api.get_latest_video(self.channel)
- if info is not None:
- if not self.channel_name:
- self._channel_name = info.author
- await self._video_handler(info)
- attempts = 0
- refreshed = False
- else:
- self._log.warning('%s: cannot get latest video', self.channel)
- if (attempts := attempts + 1) >= config.ATEMPTS:
- if refreshed:
- await self._warn_admins(attempts)
- else:
- await self.api.find_instance()
- refreshed = True
- self._log.debug('%s: sleeping for %d seconds',
- self.channel, self.delay)
- await asyncio.sleep(self.delay)
- async def main() -> int:
- loop = asyncio.get_event_loop()
- db = Database(Path(config.DB_PATH))
- bot = TelegramClient('bot', config.API_ID, config.API_HASH)
- api = invidious.Invidious()
- if config.INSTANCE:
- api.set_instance(config.INSTANCE)
- else:
- if not await api.find_instance():
- return 1
- await bot.start(bot_token=config.TOKEN)
- with Path(config.CHANNELS_PATH).open() as fptr:
- channels = json.load(fptr)
- for channel in channels:
- loop.create_task(
- YTNotify(
- db, bot, api,
- chat=channel.get('chat', config.CHAT_ID),
- delay=channel.get('timeout', 10 * 60),
- channel=channel['id'],
- ).run()
- )
- await bot.run_until_disconnected()
- return 0
- if __name__ == '__main__':
- try:
- sys.exit(
- asyncio
- .get_event_loop_policy()
- .get_event_loop()
- .run_until_complete(main())
- )
- except KeyboardInterrupt:
- pass
|