test_networking_utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. #!/usr/bin/env python3
  2. # Allow direct execution
  3. import os
  4. import sys
  5. import pytest
  6. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. import contextlib
  8. import io
  9. import platform
  10. import random
  11. import ssl
  12. import urllib.error
  13. import warnings
  14. from hypervideo_dl.cookies import YoutubeDLCookieJar
  15. from hypervideo_dl.dependencies import certifi
  16. from hypervideo_dl.networking import Response
  17. from hypervideo_dl.networking._helper import (
  18. InstanceStoreMixin,
  19. add_accept_encoding_header,
  20. get_redirect_method,
  21. make_socks_proxy_opts,
  22. select_proxy,
  23. ssl_load_certs,
  24. )
  25. from hypervideo_dl.networking.exceptions import (
  26. HTTPError,
  27. IncompleteRead,
  28. _CompatHTTPError,
  29. )
  30. from hypervideo_dl.socks import ProxyType
  31. from hypervideo_dl.utils.networking import HTTPHeaderDict
  32. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  33. class TestNetworkingUtils:
  34. def test_select_proxy(self):
  35. proxies = {
  36. 'all': 'socks5://example.com',
  37. 'http': 'http://example.com:1080',
  38. 'no': 'bypass.example.com,yt-dl.org'
  39. }
  40. assert select_proxy('https://example.com', proxies) == proxies['all']
  41. assert select_proxy('http://example.com', proxies) == proxies['http']
  42. assert select_proxy('http://bypass.example.com', proxies) is None
  43. assert select_proxy('https://yt-dl.org', proxies) is None
  44. @pytest.mark.parametrize('socks_proxy,expected', [
  45. ('socks5h://example.com', {
  46. 'proxytype': ProxyType.SOCKS5,
  47. 'addr': 'example.com',
  48. 'port': 1080,
  49. 'rdns': True,
  50. 'username': None,
  51. 'password': None
  52. }),
  53. ('socks5://user:@example.com:5555', {
  54. 'proxytype': ProxyType.SOCKS5,
  55. 'addr': 'example.com',
  56. 'port': 5555,
  57. 'rdns': False,
  58. 'username': 'user',
  59. 'password': ''
  60. }),
  61. ('socks4://u%40ser:pa%20ss@127.0.0.1:1080', {
  62. 'proxytype': ProxyType.SOCKS4,
  63. 'addr': '127.0.0.1',
  64. 'port': 1080,
  65. 'rdns': False,
  66. 'username': 'u@ser',
  67. 'password': 'pa ss'
  68. }),
  69. ('socks4a://:pa%20ss@127.0.0.1', {
  70. 'proxytype': ProxyType.SOCKS4A,
  71. 'addr': '127.0.0.1',
  72. 'port': 1080,
  73. 'rdns': True,
  74. 'username': '',
  75. 'password': 'pa ss'
  76. })
  77. ])
  78. def test_make_socks_proxy_opts(self, socks_proxy, expected):
  79. assert make_socks_proxy_opts(socks_proxy) == expected
  80. def test_make_socks_proxy_unknown(self):
  81. with pytest.raises(ValueError, match='Unknown SOCKS proxy version: socks'):
  82. make_socks_proxy_opts('socks://127.0.0.1')
  83. @pytest.mark.skipif(not certifi, reason='certifi is not installed')
  84. def test_load_certifi(self):
  85. context_certifi = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  86. context_certifi.load_verify_locations(cafile=certifi.where())
  87. context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  88. ssl_load_certs(context, use_certifi=True)
  89. assert context.get_ca_certs() == context_certifi.get_ca_certs()
  90. context_default = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  91. context_default.load_default_certs()
  92. context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  93. ssl_load_certs(context, use_certifi=False)
  94. assert context.get_ca_certs() == context_default.get_ca_certs()
  95. if context_default.get_ca_certs() == context_certifi.get_ca_certs():
  96. pytest.skip('System uses certifi as default. The test is not valid')
  97. @pytest.mark.parametrize('method,status,expected', [
  98. ('GET', 303, 'GET'),
  99. ('HEAD', 303, 'HEAD'),
  100. ('PUT', 303, 'GET'),
  101. ('POST', 301, 'GET'),
  102. ('HEAD', 301, 'HEAD'),
  103. ('POST', 302, 'GET'),
  104. ('HEAD', 302, 'HEAD'),
  105. ('PUT', 302, 'PUT'),
  106. ('POST', 308, 'POST'),
  107. ('POST', 307, 'POST'),
  108. ('HEAD', 308, 'HEAD'),
  109. ('HEAD', 307, 'HEAD'),
  110. ])
  111. def test_get_redirect_method(self, method, status, expected):
  112. assert get_redirect_method(method, status) == expected
  113. @pytest.mark.parametrize('headers,supported_encodings,expected', [
  114. ({'Accept-Encoding': 'br'}, ['gzip', 'br'], {'Accept-Encoding': 'br'}),
  115. ({}, ['gzip', 'br'], {'Accept-Encoding': 'gzip, br'}),
  116. ({'Content-type': 'application/json'}, [], {'Content-type': 'application/json', 'Accept-Encoding': 'identity'}),
  117. ])
  118. def test_add_accept_encoding_header(self, headers, supported_encodings, expected):
  119. headers = HTTPHeaderDict(headers)
  120. add_accept_encoding_header(headers, supported_encodings)
  121. assert headers == HTTPHeaderDict(expected)
  122. class TestInstanceStoreMixin:
  123. class FakeInstanceStoreMixin(InstanceStoreMixin):
  124. def _create_instance(self, **kwargs):
  125. return random.randint(0, 1000000)
  126. def _close_instance(self, instance):
  127. pass
  128. def test_mixin(self):
  129. mixin = self.FakeInstanceStoreMixin()
  130. assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}}) == mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
  131. assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'e', 4}}) != mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}})
  132. assert mixin._get_instance(d={'a': 1, 'b': 2, 'c': {'d', 4}} != mixin._get_instance(d={'a': 1, 'b': 2, 'g': {'d', 4}}))
  133. assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) == mixin._get_instance(d={'a': 1}, e=[1, 2, 3])
  134. assert mixin._get_instance(d={'a': 1}, e=[1, 2, 3]) != mixin._get_instance(d={'a': 1}, e=[1, 2, 3, 4])
  135. cookiejar = YoutubeDLCookieJar()
  136. assert mixin._get_instance(b=[1, 2], c=cookiejar) == mixin._get_instance(b=[1, 2], c=cookiejar)
  137. assert mixin._get_instance(b=[1, 2], c=cookiejar) != mixin._get_instance(b=[1, 2], c=YoutubeDLCookieJar())
  138. # Different order
  139. assert mixin._get_instance(c=cookiejar, b=[1, 2]) == mixin._get_instance(b=[1, 2], c=cookiejar)
  140. m = mixin._get_instance(t=1234)
  141. assert mixin._get_instance(t=1234) == m
  142. mixin._clear_instances()
  143. assert mixin._get_instance(t=1234) != m
  144. class TestNetworkingExceptions:
  145. @staticmethod
  146. def create_response(status):
  147. return Response(fp=io.BytesIO(b'test'), url='http://example.com', headers={'tesT': 'test'}, status=status)
  148. @pytest.mark.parametrize('http_error_class', [HTTPError, lambda r: _CompatHTTPError(HTTPError(r))])
  149. def test_http_error(self, http_error_class):
  150. response = self.create_response(403)
  151. error = http_error_class(response)
  152. assert error.status == 403
  153. assert str(error) == error.msg == 'HTTP Error 403: Forbidden'
  154. assert error.reason == response.reason
  155. assert error.response is response
  156. data = error.response.read()
  157. assert data == b'test'
  158. assert repr(error) == '<HTTPError 403: Forbidden>'
  159. @pytest.mark.parametrize('http_error_class', [HTTPError, lambda *args, **kwargs: _CompatHTTPError(HTTPError(*args, **kwargs))])
  160. def test_redirect_http_error(self, http_error_class):
  161. response = self.create_response(301)
  162. error = http_error_class(response, redirect_loop=True)
  163. assert str(error) == error.msg == 'HTTP Error 301: Moved Permanently (redirect loop detected)'
  164. assert error.reason == 'Moved Permanently'
  165. def test_compat_http_error(self):
  166. response = self.create_response(403)
  167. error = _CompatHTTPError(HTTPError(response))
  168. assert isinstance(error, HTTPError)
  169. assert isinstance(error, urllib.error.HTTPError)
  170. @contextlib.contextmanager
  171. def raises_deprecation_warning():
  172. with warnings.catch_warnings(record=True) as w:
  173. warnings.simplefilter('always')
  174. yield
  175. if len(w) == 0:
  176. pytest.fail('Did not raise DeprecationWarning')
  177. if len(w) > 1:
  178. pytest.fail(f'Raised multiple warnings: {w}')
  179. if not issubclass(w[-1].category, DeprecationWarning):
  180. pytest.fail(f'Expected DeprecationWarning, got {w[-1].category}')
  181. w.clear()
  182. with raises_deprecation_warning():
  183. assert error.code == 403
  184. with raises_deprecation_warning():
  185. assert error.getcode() == 403
  186. with raises_deprecation_warning():
  187. assert error.hdrs is error.response.headers
  188. with raises_deprecation_warning():
  189. assert error.info() is error.response.headers
  190. with raises_deprecation_warning():
  191. assert error.headers is error.response.headers
  192. with raises_deprecation_warning():
  193. assert error.filename == error.response.url
  194. with raises_deprecation_warning():
  195. assert error.url == error.response.url
  196. with raises_deprecation_warning():
  197. assert error.geturl() == error.response.url
  198. # Passthrough file operations
  199. with raises_deprecation_warning():
  200. assert error.read() == b'test'
  201. with raises_deprecation_warning():
  202. assert not error.closed
  203. with raises_deprecation_warning():
  204. # Technically Response operations are also passed through, which should not be used.
  205. assert error.get_header('test') == 'test'
  206. # Should not raise a warning
  207. error.close()
  208. @pytest.mark.skipif(
  209. platform.python_implementation() == 'PyPy', reason='garbage collector works differently in pypy')
  210. def test_compat_http_error_autoclose(self):
  211. # Compat HTTPError should not autoclose response
  212. response = self.create_response(403)
  213. _CompatHTTPError(HTTPError(response))
  214. assert not response.closed
  215. def test_incomplete_read_error(self):
  216. error = IncompleteRead(b'test', 3, cause='test')
  217. assert isinstance(error, IncompleteRead)
  218. assert repr(error) == '<IncompleteRead: 4 bytes read, 3 more expected>'
  219. assert str(error) == error.msg == '4 bytes read, 3 more expected'
  220. assert error.partial == b'test'
  221. assert error.expected == 3
  222. assert error.cause == 'test'
  223. error = IncompleteRead(b'aaa')
  224. assert repr(error) == '<IncompleteRead: 3 bytes read>'
  225. assert str(error) == '3 bytes read'