debugger.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # ©️ Dan Gazizullin, 2021-2023
  2. # This file is a part of Hikka Userbot
  3. # 🌐 https://github.com/hikariatama/Hikka
  4. # You can redistribute it and/or modify it under the terms of the GNU AGPLv3
  5. # 🔑 https://www.gnu.org/licenses/agpl-3.0.html
  6. import asyncio
  7. import logging
  8. import os
  9. import random
  10. from threading import Thread
  11. from werkzeug import Request, Response
  12. from werkzeug.debug import DebuggedApplication
  13. from werkzeug.serving import BaseWSGIServer, make_server
  14. from .. import main, utils
  15. from . import proxypass
  16. logger = logging.getLogger(__name__)
  17. class ServerThread(Thread):
  18. def __init__(self, server: BaseWSGIServer):
  19. Thread.__init__(self)
  20. self.server = server
  21. def run(self):
  22. logger.debug("Starting werkzeug debug server")
  23. self.server.serve_forever()
  24. def shutdown(self):
  25. logger.debug("Shutting down werkzeug debug server")
  26. self.server.shutdown()
  27. class WebDebugger:
  28. def __init__(self):
  29. self._url = None
  30. self.exceptions = {}
  31. self.pin = str(random.randint(100000, 999999))
  32. self.port = main.gen_port("werkzeug_port", True)
  33. main.save_config_key("werkzeug_port", self.port)
  34. self._proxypasser = proxypass.ProxyPasser(self._url_changed)
  35. asyncio.ensure_future(self._getproxy())
  36. self._create_server()
  37. self._controller = ServerThread(self._server)
  38. logging.getLogger("werkzeug").setLevel(logging.WARNING)
  39. self._controller.start()
  40. utils.atexit(self._controller.shutdown)
  41. self.proxy_ready = asyncio.Event()
  42. async def _getproxy(self):
  43. self._url = await self._proxypasser.get_url(self.port)
  44. self.proxy_ready.set()
  45. def _url_changed(self, url: str):
  46. self._url = url
  47. def _create_server(self) -> BaseWSGIServer:
  48. logger.debug("Creating new werkzeug server instance")
  49. os.environ["WERKZEUG_DEBUG_PIN"] = self.pin
  50. os.environ["WERKZEUG_RUN_MAIN"] = "true"
  51. @Request.application
  52. def app(request):
  53. if request.args.get("ping", "N").upper() == "Y":
  54. return Response("ok")
  55. if request.args.get("shutdown", "N").upper() == "Y":
  56. self._server._BaseServer__shutdown_request = True
  57. return Response("Shutdown!")
  58. raise self.exceptions.get(request.args.get("ex_id"), Exception("idk"))
  59. app = DebuggedApplication(app, evalex=True, pin_security=True)
  60. try:
  61. fd = int(os.environ["WERKZEUG_SERVER_FD"])
  62. except (LookupError, ValueError):
  63. fd = None
  64. self._server = make_server(
  65. "localhost",
  66. self.port,
  67. app,
  68. threaded=False,
  69. processes=1,
  70. request_handler=None,
  71. passthrough_errors=False,
  72. ssl_context=None,
  73. fd=fd,
  74. )
  75. return self._server
  76. @property
  77. def url(self) -> str:
  78. return self._url or f"http://127.0.0.1:{self.port}"
  79. def feed(self, exc_type, exc_value, exc_traceback) -> str:
  80. logger.debug("Feeding exception %s to werkzeug debugger", exc_type)
  81. id_ = utils.rand(8)
  82. self.exceptions[id_] = exc_type(exc_value).with_traceback(exc_traceback)
  83. return self.url.strip("/") + f"?ex_id={id_}"