client.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. import asyncio
  3. import logging
  4. import threading
  5. import httpcore
  6. import httpx
  7. from httpx_socks import AsyncProxyTransport
  8. from python_socks import (
  9. parse_proxy_url,
  10. ProxyConnectionError,
  11. ProxyTimeoutError,
  12. ProxyError
  13. )
  14. import python_socks._errors
  15. from searx import logger
  16. # Optional uvloop (support Python 3.6)
  17. try:
  18. import uvloop
  19. except ImportError:
  20. pass
  21. else:
  22. uvloop.install()
  23. logger = logger.getChild('searx.http.client')
  24. LOOP = None
  25. SSLCONTEXTS = {}
  26. TRANSPORT_KWARGS = {
  27. 'backend': 'asyncio',
  28. 'trust_env': False,
  29. }
  30. async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
  31. origin = httpcore._utils.url_to_origin(url)
  32. logger.debug('Drop connections for %r', origin)
  33. connections_to_close = connection_pool._connections_for_origin(origin)
  34. for connection in connections_to_close:
  35. await connection_pool._remove_from_pool(connection)
  36. try:
  37. await connection.aclose()
  38. except httpx.NetworkError as e:
  39. logger.warning('Error closing an existing connection', exc_info=e)
  40. def get_sslcontexts(proxy_url=None, cert=None, verify=True, trust_env=True, http2=False):
  41. global SSLCONTEXTS
  42. key = (proxy_url, cert, verify, trust_env, http2)
  43. if key not in SSLCONTEXTS:
  44. SSLCONTEXTS[key] = httpx.create_ssl_context(cert, verify, trust_env, http2)
  45. return SSLCONTEXTS[key]
  46. class AsyncHTTPTransportNoHttp(httpx.AsyncHTTPTransport):
  47. """Block HTTP request"""
  48. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  49. raise httpx.UnsupportedProtocol("HTTP protocol is disabled")
  50. class AsyncProxyTransportFixed(AsyncProxyTransport):
  51. """Fix httpx_socks.AsyncProxyTransport
  52. Map python_socks exceptions to httpx.ProxyError
  53. Map socket.gaierror to httpx.ConnectError
  54. Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
  55. * self._keepalive_sweep()
  56. * self._response_closed(self, connection)
  57. Note: AsyncProxyTransport inherit from AsyncConnectionPool
  58. """
  59. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  60. retry = 2
  61. while retry > 0:
  62. retry -= 1
  63. try:
  64. return await super().handle_async_request(method, url, headers, stream, extensions)
  65. except (ProxyConnectionError, ProxyTimeoutError, ProxyError) as e:
  66. raise httpx.ProxyError(e)
  67. except OSError as e:
  68. # socket.gaierror when DNS resolution fails
  69. raise httpx.NetworkError(e)
  70. except httpx.RemoteProtocolError as e:
  71. # in case of httpx.RemoteProtocolError: Server disconnected
  72. await close_connections_for_url(self, url)
  73. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  74. # retry
  75. except (httpx.NetworkError, httpx.ProtocolError) as e:
  76. # httpx.WriteError on HTTP/2 connection leaves a new opened stream
  77. # then each new request creates a new stream and raise the same WriteError
  78. await close_connections_for_url(self, url)
  79. raise e
  80. class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
  81. """Fix httpx.AsyncHTTPTransport"""
  82. async def handle_async_request(self, method, url, headers=None, stream=None, extensions=None):
  83. retry = 2
  84. while retry > 0:
  85. retry -= 1
  86. try:
  87. return await super().handle_async_request(method, url, headers, stream, extensions)
  88. except OSError as e:
  89. # socket.gaierror when DNS resolution fails
  90. raise httpx.ConnectError(e)
  91. except httpx.CloseError as e:
  92. # httpx.CloseError: [Errno 104] Connection reset by peer
  93. # raised by _keepalive_sweep()
  94. # from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
  95. await close_connections_for_url(self._pool, url)
  96. logger.warning('httpx.CloseError: retry', exc_info=e)
  97. # retry
  98. except httpx.RemoteProtocolError as e:
  99. # in case of httpx.RemoteProtocolError: Server disconnected
  100. await close_connections_for_url(self._pool, url)
  101. logger.warning('httpx.RemoteProtocolError: retry', exc_info=e)
  102. # retry
  103. except (httpx.ProtocolError, httpx.NetworkError) as e:
  104. await close_connections_for_url(self._pool, url)
  105. raise e
  106. def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
  107. # support socks5h (requests compatibility):
  108. # https://requests.readthedocs.io/en/master/user/advanced/#socks
  109. # socks5:// hostname is resolved on client side
  110. # socks5h:// hostname is resolved on proxy side
  111. rdns = False
  112. socks5h = 'socks5h://'
  113. if proxy_url.startswith(socks5h):
  114. proxy_url = 'socks5://' + proxy_url[len(socks5h):]
  115. rdns = True
  116. proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
  117. verify = get_sslcontexts(proxy_url, None, True, False, http2) if verify is True else verify
  118. return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
  119. username=proxy_username, password=proxy_password,
  120. rdns=rdns,
  121. loop=get_loop(),
  122. verify=verify,
  123. http2=http2,
  124. local_address=local_address,
  125. max_connections=limit.max_connections,
  126. max_keepalive_connections=limit.max_keepalive_connections,
  127. keepalive_expiry=limit.keepalive_expiry,
  128. retries=retries,
  129. **TRANSPORT_KWARGS)
  130. def get_transport(verify, http2, local_address, proxy_url, limit, retries):
  131. verify = get_sslcontexts(None, None, True, False, http2) if verify is True else verify
  132. return AsyncHTTPTransportFixed(verify=verify,
  133. http2=http2,
  134. local_address=local_address,
  135. proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
  136. limits=limit,
  137. retries=retries,
  138. **TRANSPORT_KWARGS)
  139. def iter_proxies(proxies):
  140. # https://www.python-httpx.org/compatibility/#proxy-keys
  141. if isinstance(proxies, str):
  142. yield 'all://', proxies
  143. elif isinstance(proxies, dict):
  144. for pattern, proxy_url in proxies.items():
  145. yield pattern, proxy_url
  146. def new_client(enable_http, verify, enable_http2,
  147. max_connections, max_keepalive_connections, keepalive_expiry,
  148. proxies, local_address, retries, max_redirects):
  149. limit = httpx.Limits(max_connections=max_connections,
  150. max_keepalive_connections=max_keepalive_connections,
  151. keepalive_expiry=keepalive_expiry)
  152. # See https://www.python-httpx.org/advanced/#routing
  153. mounts = {}
  154. for pattern, proxy_url in iter_proxies(proxies):
  155. if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
  156. continue
  157. if proxy_url.startswith('socks4://') \
  158. or proxy_url.startswith('socks5://') \
  159. or proxy_url.startswith('socks5h://'):
  160. mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
  161. retries)
  162. else:
  163. mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
  164. if not enable_http:
  165. mounts['http://'] = AsyncHTTPTransportNoHttp()
  166. transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
  167. return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
  168. def get_loop():
  169. global LOOP
  170. return LOOP
  171. def init():
  172. # log
  173. for logger_name in ('hpack.hpack', 'hpack.table'):
  174. logging.getLogger(logger_name).setLevel(logging.WARNING)
  175. # loop
  176. def loop_thread():
  177. global LOOP
  178. LOOP = asyncio.new_event_loop()
  179. LOOP.run_forever()
  180. th = threading.Thread(
  181. target=loop_thread,
  182. name='asyncio_loop',
  183. daemon=True,
  184. )
  185. th.start()
  186. init()