|
- import abc
- import base64
- import contextlib
- import functools
- import json
- import os
- import random
- import ssl
- import threading
- from http.server import BaseHTTPRequestHandler
- from socketserver import ThreadingTCPServer
- import pytest
- from test.helper import http_server_port, verify_address_availability
- from test.test_networking import TEST_DIR
- from test.test_socks import IPv6ThreadingTCPServer
- from yt_dlp.dependencies import urllib3
- from yt_dlp.networking import Request
- from yt_dlp.networking.exceptions import HTTPError, ProxyError, SSLError
- class HTTPProxyAuthMixin:
- def proxy_auth_error(self):
- self.send_response(407)
- self.send_header('Proxy-Authenticate', 'Basic realm="test http proxy"')
- self.end_headers()
- return False
- def do_proxy_auth(self, username, password):
- if username is None and password is None:
- return True
- proxy_auth_header = self.headers.get('Proxy-Authorization', None)
- if proxy_auth_header is None:
- return self.proxy_auth_error()
- if not proxy_auth_header.startswith('Basic '):
- return self.proxy_auth_error()
- auth = proxy_auth_header[6:]
- try:
- auth_username, auth_password = base64.b64decode(auth).decode().split(':', 1)
- except Exception:
- return self.proxy_auth_error()
- if auth_username != (username or '') or auth_password != (password or ''):
- return self.proxy_auth_error()
- return True
- class HTTPProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
- def __init__(self, *args, proxy_info=None, username=None, password=None, request_handler=None, **kwargs):
- self.username = username
- self.password = password
- self.proxy_info = proxy_info
- super().__init__(*args, **kwargs)
- def do_GET(self):
- if not self.do_proxy_auth(self.username, self.password):
- self.server.close_request(self.request)
- return
- if self.path.endswith('/proxy_info'):
- payload = json.dumps(self.proxy_info or {
- 'client_address': self.client_address,
- 'connect': False,
- 'connect_host': None,
- 'connect_port': None,
- 'headers': dict(self.headers),
- 'path': self.path,
- 'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
- })
- self.send_response(200)
- self.send_header('Content-Type', 'application/json; charset=utf-8')
- self.send_header('Content-Length', str(len(payload)))
- self.end_headers()
- self.wfile.write(payload.encode())
- else:
- self.send_response(404)
- self.end_headers()
- self.server.close_request(self.request)
- if urllib3:
- import urllib3.util.ssltransport
- class SSLTransport(urllib3.util.ssltransport.SSLTransport):
- """
- Modified version of urllib3 SSLTransport to support server side SSL
- This allows us to chain multiple TLS connections.
- """
- def __init__(self, socket, ssl_context, server_hostname=None, suppress_ragged_eofs=True, server_side=False):
- self.incoming = ssl.MemoryBIO()
- self.outgoing = ssl.MemoryBIO()
- self.suppress_ragged_eofs = suppress_ragged_eofs
- self.socket = socket
- self.sslobj = ssl_context.wrap_bio(
- self.incoming,
- self.outgoing,
- server_hostname=server_hostname,
- server_side=server_side,
- )
- self._ssl_io_loop(self.sslobj.do_handshake)
- @property
- def _io_refs(self):
- return self.socket._io_refs
- @_io_refs.setter
- def _io_refs(self, value):
- self.socket._io_refs = value
- def shutdown(self, *args, **kwargs):
- self.socket.shutdown(*args, **kwargs)
- else:
- SSLTransport = None
- class HTTPSProxyHandler(HTTPProxyHandler):
- def __init__(self, request, *args, **kwargs):
- certfn = os.path.join(TEST_DIR, 'testcert.pem')
- sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- sslctx.load_cert_chain(certfn, None)
- if isinstance(request, ssl.SSLSocket):
- request = SSLTransport(request, ssl_context=sslctx, server_side=True)
- else:
- request = sslctx.wrap_socket(request, server_side=True)
- super().__init__(request, *args, **kwargs)
- class HTTPConnectProxyHandler(BaseHTTPRequestHandler, HTTPProxyAuthMixin):
- protocol_version = 'HTTP/1.1'
- default_request_version = 'HTTP/1.1'
- def __init__(self, *args, username=None, password=None, request_handler=None, **kwargs):
- self.username = username
- self.password = password
- self.request_handler = request_handler
- super().__init__(*args, **kwargs)
- def do_CONNECT(self):
- if not self.do_proxy_auth(self.username, self.password):
- self.server.close_request(self.request)
- return
- self.send_response(200)
- self.end_headers()
- proxy_info = {
- 'client_address': self.client_address,
- 'connect': True,
- 'connect_host': self.path.split(':')[0],
- 'connect_port': int(self.path.split(':')[1]),
- 'headers': dict(self.headers),
- 'path': self.path,
- 'proxy': ':'.join(str(y) for y in self.connection.getsockname()),
- }
- self.request_handler(self.request, self.client_address, self.server, proxy_info=proxy_info)
- self.server.close_request(self.request)
- class HTTPSConnectProxyHandler(HTTPConnectProxyHandler):
- def __init__(self, request, *args, **kwargs):
- certfn = os.path.join(TEST_DIR, 'testcert.pem')
- sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- sslctx.load_cert_chain(certfn, None)
- request = sslctx.wrap_socket(request, server_side=True)
- self._original_request = request
- super().__init__(request, *args, **kwargs)
- def do_CONNECT(self):
- super().do_CONNECT()
- self.server.close_request(self._original_request)
- @contextlib.contextmanager
- def proxy_server(proxy_server_class, request_handler, bind_ip=None, **proxy_server_kwargs):
- server = server_thread = None
- try:
- bind_address = bind_ip or '127.0.0.1'
- server_type = ThreadingTCPServer if '.' in bind_address else IPv6ThreadingTCPServer
- server = server_type(
- (bind_address, 0), functools.partial(proxy_server_class, request_handler=request_handler, **proxy_server_kwargs))
- server_port = http_server_port(server)
- server_thread = threading.Thread(target=server.serve_forever)
- server_thread.daemon = True
- server_thread.start()
- if '.' not in bind_address:
- yield f'[{bind_address}]:{server_port}'
- else:
- yield f'{bind_address}:{server_port}'
- finally:
- server.shutdown()
- server.server_close()
- server_thread.join(2.0)
- class HTTPProxyTestContext(abc.ABC):
- REQUEST_HANDLER_CLASS = None
- REQUEST_PROTO = None
- def http_server(self, server_class, *args, **kwargs):
- return proxy_server(server_class, self.REQUEST_HANDLER_CLASS, *args, **kwargs)
- @abc.abstractmethod
- def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs) -> dict:
- """return a dict of proxy_info"""
- class HTTPProxyHTTPTestContext(HTTPProxyTestContext):
- # Standard HTTP Proxy for http requests
- REQUEST_HANDLER_CLASS = HTTPProxyHandler
- REQUEST_PROTO = 'http'
- def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
- request = Request(f'http://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
- handler.validate(request)
- return json.loads(handler.send(request).read().decode())
- class HTTPProxyHTTPSTestContext(HTTPProxyTestContext):
- # HTTP Connect proxy, for https requests
- REQUEST_HANDLER_CLASS = HTTPSProxyHandler
- REQUEST_PROTO = 'https'
- def proxy_info_request(self, handler, target_domain=None, target_port=None, **req_kwargs):
- request = Request(f'https://{target_domain or "127.0.0.1"}:{target_port or "40000"}/proxy_info', **req_kwargs)
- handler.validate(request)
- return json.loads(handler.send(request).read().decode())
- CTX_MAP = {
- 'http': HTTPProxyHTTPTestContext,
- 'https': HTTPProxyHTTPSTestContext,
- }
- @pytest.fixture(scope='module')
- def ctx(request):
- return CTX_MAP[request.param]()
- @pytest.mark.parametrize(
- 'handler', ['Urllib', 'Requests', 'CurlCFFI'], indirect=True)
- @pytest.mark.parametrize('ctx', ['http'], indirect=True) # pure http proxy can only support http
- class TestHTTPProxy:
- def test_http_no_auth(self, handler, ctx):
- with ctx.http_server(HTTPProxyHandler) as server_address:
- with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['connect'] is False
- assert 'Proxy-Authorization' not in proxy_info['headers']
- def test_http_auth(self, handler, ctx):
- with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
- with handler(proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert 'Proxy-Authorization' in proxy_info['headers']
- def test_http_bad_auth(self, handler, ctx):
- with ctx.http_server(HTTPProxyHandler, username='test', password='test') as server_address:
- with handler(proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
- with pytest.raises(HTTPError) as exc_info:
- ctx.proxy_info_request(rh)
- assert exc_info.value.response.status == 407
- exc_info.value.response.close()
- def test_http_source_address(self, handler, ctx):
- with ctx.http_server(HTTPProxyHandler) as server_address:
- source_address = f'127.0.0.{random.randint(5, 255)}'
- verify_address_availability(source_address)
- with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
- source_address=source_address) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['client_address'][0] == source_address
- @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
- def test_https(self, handler, ctx):
- with ctx.http_server(HTTPSProxyHandler) as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['connect'] is False
- assert 'Proxy-Authorization' not in proxy_info['headers']
- @pytest.mark.skip_handler('Urllib', 'urllib does not support https proxies')
- def test_https_verify_failed(self, handler, ctx):
- with ctx.http_server(HTTPSProxyHandler) as server_address:
- with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
- # Accept SSLError as may not be feasible to tell if it is proxy or request error.
- # note: if request proto also does ssl verification, this may also be the error of the request.
- # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
- with pytest.raises((ProxyError, SSLError)):
- ctx.proxy_info_request(rh)
- def test_http_with_idn(self, handler, ctx):
- with ctx.http_server(HTTPProxyHandler) as server_address:
- with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh, target_domain='中文.tw')
- assert proxy_info['proxy'] == server_address
- assert proxy_info['path'].startswith('http://xn--fiq228c.tw')
- assert proxy_info['headers']['Host'].split(':', 1)[0] == 'xn--fiq228c.tw'
- @pytest.mark.parametrize(
- 'handler,ctx', [
- ('Requests', 'https'),
- ('CurlCFFI', 'https'),
- ], indirect=True)
- class TestHTTPConnectProxy:
- def test_http_connect_no_auth(self, handler, ctx):
- with ctx.http_server(HTTPConnectProxyHandler) as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['connect'] is True
- assert 'Proxy-Authorization' not in proxy_info['headers']
- def test_http_connect_auth(self, handler, ctx):
- with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:test@{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert 'Proxy-Authorization' in proxy_info['headers']
- @pytest.mark.skip_handler(
- 'Requests',
- 'bug in urllib3 causes unclosed socket: https://github.com/urllib3/urllib3/issues/3374',
- )
- def test_http_connect_bad_auth(self, handler, ctx):
- with ctx.http_server(HTTPConnectProxyHandler, username='test', password='test') as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'http://test:bad@{server_address}'}) as rh:
- with pytest.raises(ProxyError):
- ctx.proxy_info_request(rh)
- def test_http_connect_source_address(self, handler, ctx):
- with ctx.http_server(HTTPConnectProxyHandler) as server_address:
- source_address = f'127.0.0.{random.randint(5, 255)}'
- verify_address_availability(source_address)
- with handler(proxies={ctx.REQUEST_PROTO: f'http://{server_address}'},
- source_address=source_address,
- verify=False) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['client_address'][0] == source_address
- @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
- def test_https_connect_proxy(self, handler, ctx):
- with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert proxy_info['connect'] is True
- assert 'Proxy-Authorization' not in proxy_info['headers']
- @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
- def test_https_connect_verify_failed(self, handler, ctx):
- with ctx.http_server(HTTPSConnectProxyHandler) as server_address:
- with handler(verify=True, proxies={ctx.REQUEST_PROTO: f'https://{server_address}'}) as rh:
- # Accept SSLError as may not be feasible to tell if it is proxy or request error.
- # note: if request proto also does ssl verification, this may also be the error of the request.
- # Until we can support passing custom cacerts to handlers, we cannot properly test this for all cases.
- with pytest.raises((ProxyError, SSLError)):
- ctx.proxy_info_request(rh)
- @pytest.mark.skipif(urllib3 is None, reason='requires urllib3 to test')
- def test_https_connect_proxy_auth(self, handler, ctx):
- with ctx.http_server(HTTPSConnectProxyHandler, username='test', password='test') as server_address:
- with handler(verify=False, proxies={ctx.REQUEST_PROTO: f'https://test:test@{server_address}'}) as rh:
- proxy_info = ctx.proxy_info_request(rh)
- assert proxy_info['proxy'] == server_address
- assert 'Proxy-Authorization' in proxy_info['headers']
|