core.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. """Responsible for web init and mandatory ops"""
  2. # Friendly Telegram (telegram userbot)
  3. # Copyright (C) 2018-2021 The Authors
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. # This program is distributed in the hope that it will be useful,
  9. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. # GNU Affero General Public License for more details.
  12. # You should have received a copy of the GNU Affero General Public License
  13. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  14. # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
  15. # █▀█ █ █ █ █▀█ █▀▄ █
  16. # © Copyright 2022
  17. # https://t.me/hikariatama
  18. #
  19. # 🔒 Licensed under the GNU AGPLv3
  20. # 🌐 https://www.gnu.org/licenses/agpl-3.0.html
  21. import asyncio
  22. import contextlib
  23. import inspect
  24. import logging
  25. import os
  26. import re
  27. import subprocess
  28. import atexit
  29. import typing
  30. import aiohttp_jinja2
  31. import jinja2
  32. from aiohttp import web
  33. from . import root
  34. from ..tl_cache import CustomTelegramClient
  35. from ..database import Database
  36. from ..loader import Modules
  37. logger = logging.getLogger(__name__)
  38. class Web(root.Web):
  39. def __init__(self, **kwargs):
  40. self.runner = None
  41. self.port = None
  42. self.running = asyncio.Event()
  43. self.ready = asyncio.Event()
  44. self.client_data = {}
  45. self.app = web.Application()
  46. aiohttp_jinja2.setup(
  47. self.app,
  48. filters={"getdoc": inspect.getdoc, "ascii": ascii},
  49. loader=jinja2.FileSystemLoader("web-resources"),
  50. )
  51. self.app["static_root_url"] = "/static"
  52. super().__init__(**kwargs)
  53. self.app.router.add_get("/favicon.ico", self.favicon)
  54. self.app.router.add_static("/static/", "web-resources/static")
  55. async def start_if_ready(
  56. self,
  57. total_count: int,
  58. port: int,
  59. proxy_pass: bool = False,
  60. ):
  61. if total_count <= len(self.client_data):
  62. if not self.running.is_set():
  63. await self.start(port, proxy_pass=proxy_pass)
  64. self.ready.set()
  65. async def _sleep_for_task(self, callback: callable, data: bytes, delay: int):
  66. await asyncio.sleep(delay)
  67. await callback(data.decode("utf-8"))
  68. async def _read_stream(
  69. self,
  70. callback: callable,
  71. stream: typing.BinaryIO,
  72. delay: int,
  73. ) -> None:
  74. last_task = None
  75. for getline in iter(stream.readline, ""):
  76. data_chunk = await getline
  77. if not data_chunk:
  78. if last_task:
  79. last_task.cancel()
  80. await callback(data_chunk.decode("utf-8"))
  81. if not self._stream_processed.is_set():
  82. self._stream_processed.set()
  83. break
  84. if last_task:
  85. last_task.cancel()
  86. last_task = asyncio.ensure_future(
  87. self._sleep_for_task(callback, data_chunk, delay)
  88. )
  89. def _kill_tunnel(self):
  90. try:
  91. self._sproc.kill()
  92. except Exception:
  93. pass
  94. else:
  95. logger.debug("Proxy pass tunnel killed")
  96. async def _reopen_tunnel(self):
  97. await asyncio.sleep(3600)
  98. self._kill_tunnel()
  99. self._stream_processed.clear()
  100. self._tunnel_url = None
  101. url = await asyncio.wait_for(self._get_proxy_pass_url(self.port), timeout=10)
  102. if not url:
  103. raise Exception("Failed to get proxy pass url")
  104. self._tunnel_url = url
  105. asyncio.ensure_future(self._reopen_tunnel())
  106. async def _process_stream(self, stdout_line: str) -> None:
  107. if self._stream_processed.is_set():
  108. return
  109. regex = r"[a-zA-Z0-9]\.lhrtunnel\.link tunneled.*(https:\/\/.*\.link)"
  110. if re.search(regex, stdout_line):
  111. logger.debug("Proxy pass tunneled: %s", stdout_line)
  112. self._tunnel_url = re.search(regex, stdout_line)[1]
  113. self._stream_processed.set()
  114. atexit.register(self._kill_tunnel)
  115. async def _get_proxy_pass_url(self, port: int) -> typing.Optional[str]:
  116. logger.debug("Starting proxy pass shell")
  117. self._sproc = await asyncio.create_subprocess_shell(
  118. "ssh -o StrictHostKeyChecking=no -R"
  119. f" 80:localhost:{port} nokey@localhost.run",
  120. stdin=asyncio.subprocess.PIPE,
  121. stdout=asyncio.subprocess.PIPE,
  122. stderr=asyncio.subprocess.PIPE,
  123. )
  124. self._stream_processed = asyncio.Event()
  125. logger.debug("Starting proxy pass reader")
  126. asyncio.ensure_future(
  127. self._read_stream(
  128. self._process_stream,
  129. self._sproc.stdout,
  130. 1,
  131. )
  132. )
  133. await self._stream_processed.wait()
  134. return self._tunnel_url if hasattr(self, "_tunnel_url") else None
  135. async def get_url(self, proxy_pass: bool) -> str:
  136. url = None
  137. if all(option in os.environ for option in {"LAVHOST", "USER", "SERVER"}):
  138. return f"https://{os.environ['USER']}.{os.environ['SERVER']}.lavhost.ml"
  139. if proxy_pass:
  140. with contextlib.suppress(Exception):
  141. self._kill_tunnel()
  142. url = await asyncio.wait_for(
  143. self._get_proxy_pass_url(self.port),
  144. timeout=10,
  145. )
  146. if not url:
  147. ip = (
  148. "127.0.0.1"
  149. if "DOCKER" not in os.environ
  150. else subprocess.run(
  151. ["hostname", "-i"],
  152. stdout=subprocess.PIPE,
  153. check=True,
  154. )
  155. .stdout.decode("utf-8")
  156. .strip()
  157. )
  158. url = f"http://{ip}:{self.port}"
  159. else:
  160. asyncio.ensure_future(self._reopen_tunnel())
  161. self.url = url
  162. return url
  163. async def start(self, port: int, proxy_pass: bool = False):
  164. self.runner = web.AppRunner(self.app)
  165. await self.runner.setup()
  166. self.port = os.environ.get("PORT", port)
  167. site = web.TCPSite(self.runner, None, self.port)
  168. await site.start()
  169. await self.get_url(proxy_pass)
  170. self.running.set()
  171. async def stop(self):
  172. await self.runner.shutdown()
  173. await self.runner.cleanup()
  174. self.running.clear()
  175. self.ready.clear()
  176. async def add_loader(
  177. self,
  178. client: CustomTelegramClient,
  179. loader: Modules,
  180. db: Database,
  181. ):
  182. self.client_data[client.tg_id] = (loader, client, db)
  183. @staticmethod
  184. async def favicon(_):
  185. return web.Response(
  186. status=301,
  187. headers={"Location": "https://i.imgur.com/IRAiWBo.jpeg"},
  188. )