proxypass.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import asyncio
  7. import logging
  8. import os
  9. import re
  10. import typing
  11. from .. import utils
  12. logger = logging.getLogger(__name__)
  13. class ProxyPasser:
  14. def __init__(self, change_url_callback: callable = lambda _: None):
  15. self._tunnel_url = None
  16. self._sproc = None
  17. self._url_available = asyncio.Event()
  18. self._url_available.set()
  19. self._lock = asyncio.Lock()
  20. self._change_url_callback = change_url_callback
  21. async def _sleep_for_task(self, callback: callable, data: bytes, delay: int):
  22. await asyncio.sleep(delay)
  23. await callback(data.decode("utf-8"))
  24. async def _read_stream(
  25. self,
  26. callback: callable,
  27. stream: typing.BinaryIO,
  28. delay: int,
  29. ) -> None:
  30. last_task = None
  31. for getline in iter(stream.readline, ""):
  32. data_chunk = await getline
  33. if not data_chunk:
  34. if last_task:
  35. last_task.cancel()
  36. await callback(data_chunk.decode("utf-8"))
  37. if not self._url_available.is_set():
  38. self._url_available.set()
  39. if last_task:
  40. last_task.cancel()
  41. last_task = asyncio.ensure_future(
  42. self._sleep_for_task(callback, data_chunk, delay)
  43. )
  44. def kill(self):
  45. try:
  46. self._sproc.terminate()
  47. except Exception:
  48. logger.exception("Failed to kill proxy pass process")
  49. else:
  50. logger.debug("Proxy pass tunnel killed")
  51. async def _process_stream(self, stdout_line: str) -> None:
  52. regex = r"tunneled.*?(https:\/\/.*?\..*?\.[a-z]+)"
  53. if re.search(regex, stdout_line):
  54. self._tunnel_url = re.search(regex, stdout_line)[1]
  55. self._change_url_callback(self._tunnel_url)
  56. logger.debug("Proxy pass tunneled: %s", self._tunnel_url)
  57. self._url_available.set()
  58. async def get_url(self, port: int) -> typing.Optional[str]:
  59. async with self._lock:
  60. if self._tunnel_url:
  61. try:
  62. await asyncio.wait_for(self._sproc.wait(), timeout=0.05)
  63. except asyncio.TimeoutError:
  64. return self._tunnel_url
  65. else:
  66. self.kill()
  67. if "DOCKER" in os.environ:
  68. # We're in a Docker container, so we can't use ssh
  69. # Also, the concept of Docker is to keep
  70. # everything isolated, so we can't proxy-pass to
  71. # open web.
  72. return None
  73. logger.debug("Starting proxy pass shell for port %d", port)
  74. self._sproc = await asyncio.create_subprocess_shell(
  75. "ssh -o StrictHostKeyChecking=no -R"
  76. f" 80:127.0.0.1:{port} nokey@localhost.run",
  77. stdin=asyncio.subprocess.PIPE,
  78. stdout=asyncio.subprocess.PIPE,
  79. stderr=asyncio.subprocess.PIPE,
  80. )
  81. utils.atexit(self.kill)
  82. self._url_available = asyncio.Event()
  83. logger.debug("Starting proxy pass reader for port %d", port)
  84. asyncio.ensure_future(
  85. self._read_stream(
  86. self._process_stream,
  87. self._sproc.stdout,
  88. 1,
  89. )
  90. )
  91. try:
  92. await asyncio.wait_for(self._url_available.wait(), 15)
  93. except asyncio.TimeoutError:
  94. self.kill()
  95. self._tunnel_url = None
  96. return await self.get_url(port)
  97. logger.debug("Proxy pass tunnel url to port %d: %s", port, self._tunnel_url)
  98. return self._tunnel_url