test_downloader_external.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from __future__ import unicode_literals
  4. # Allow direct execution
  5. import os
  6. import re
  7. import sys
  8. import subprocess
  9. import unittest
  10. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  11. from test.helper import (
  12. FakeLogger,
  13. FakeYDL,
  14. http_server_port,
  15. try_rm,
  16. )
  17. from youtube_dl import YoutubeDL
  18. from youtube_dl.compat import (
  19. compat_contextlib_suppress,
  20. compat_http_cookiejar_Cookie,
  21. compat_http_server,
  22. compat_kwargs,
  23. )
  24. from youtube_dl.utils import (
  25. encodeFilename,
  26. join_nonempty,
  27. )
  28. from youtube_dl.downloader.external import (
  29. Aria2cFD,
  30. Aria2pFD,
  31. AxelFD,
  32. CurlFD,
  33. FFmpegFD,
  34. HttpieFD,
  35. WgetFD,
  36. )
  37. from youtube_dl.postprocessor import (
  38. FFmpegPostProcessor,
  39. )
  40. import threading
  41. TEST_SIZE = 10 * 1024
  42. TEST_COOKIE = {
  43. 'version': 0,
  44. 'name': 'test',
  45. 'value': 'ytdlp',
  46. 'port': None,
  47. 'port_specified': False,
  48. 'domain': '.example.com',
  49. 'domain_specified': True,
  50. 'domain_initial_dot': False,
  51. 'path': '/',
  52. 'path_specified': True,
  53. 'secure': False,
  54. 'expires': None,
  55. 'discard': False,
  56. 'comment': None,
  57. 'comment_url': None,
  58. 'rest': {},
  59. }
  60. TEST_COOKIE_VALUE = join_nonempty('name', 'value', delim='=', from_dict=TEST_COOKIE)
  61. TEST_INFO = {'url': 'http://www.example.com/'}
  62. def cookiejar_Cookie(**cookie_args):
  63. return compat_http_cookiejar_Cookie(**compat_kwargs(cookie_args))
  64. def ifExternalFDAvailable(externalFD):
  65. return unittest.skipUnless(externalFD.available(),
  66. externalFD.get_basename() + ' not found')
  67. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  68. def log_message(self, format, *args):
  69. pass
  70. def send_content_range(self, total=None):
  71. range_header = self.headers.get('Range')
  72. start = end = None
  73. if range_header:
  74. mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)
  75. if mobj:
  76. start, end = (int(mobj.group(i)) for i in (1, 2))
  77. valid_range = start is not None and end is not None
  78. if valid_range:
  79. content_range = 'bytes %d-%d' % (start, end)
  80. if total:
  81. content_range += '/%d' % total
  82. self.send_header('Content-Range', content_range)
  83. return (end - start + 1) if valid_range else total
  84. def serve(self, range=True, content_length=True):
  85. self.send_response(200)
  86. self.send_header('Content-Type', 'video/mp4')
  87. size = TEST_SIZE
  88. if range:
  89. size = self.send_content_range(TEST_SIZE)
  90. if content_length:
  91. self.send_header('Content-Length', size)
  92. self.end_headers()
  93. self.wfile.write(b'#' * size)
  94. def do_GET(self):
  95. if self.path == '/regular':
  96. self.serve()
  97. elif self.path == '/no-content-length':
  98. self.serve(content_length=False)
  99. elif self.path == '/no-range':
  100. self.serve(range=False)
  101. elif self.path == '/no-range-no-content-length':
  102. self.serve(range=False, content_length=False)
  103. else:
  104. assert False, 'unrecognised server path'
  105. @ifExternalFDAvailable(Aria2pFD)
  106. class TestAria2pFD(unittest.TestCase):
  107. def setUp(self):
  108. self.httpd = compat_http_server.HTTPServer(
  109. ('127.0.0.1', 0), HTTPTestRequestHandler)
  110. self.port = http_server_port(self.httpd)
  111. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  112. self.server_thread.daemon = True
  113. self.server_thread.start()
  114. def download(self, params, ep):
  115. with subprocess.Popen(
  116. ['aria2c', '--enable-rpc'],
  117. stdout=subprocess.DEVNULL,
  118. stderr=subprocess.DEVNULL
  119. ) as process:
  120. if not process.poll():
  121. filename = 'testfile.mp4'
  122. params['logger'] = FakeLogger()
  123. params['outtmpl'] = filename
  124. ydl = YoutubeDL(params)
  125. try_rm(encodeFilename(filename))
  126. self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)
  127. self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
  128. try_rm(encodeFilename(filename))
  129. process.kill()
  130. def download_all(self, params):
  131. for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  132. self.download(params, ep)
  133. def test_regular(self):
  134. self.download_all({'external_downloader': 'aria2p'})
  135. def test_chunked(self):
  136. self.download_all({
  137. 'external_downloader': 'aria2p',
  138. 'http_chunk_size': 1000,
  139. })
  140. @ifExternalFDAvailable(HttpieFD)
  141. class TestHttpieFD(unittest.TestCase):
  142. def test_make_cmd(self):
  143. with FakeYDL() as ydl:
  144. downloader = HttpieFD(ydl, {})
  145. self.assertEqual(
  146. downloader._make_cmd('test', TEST_INFO),
  147. ['http', '--download', '--output', 'test', 'http://www.example.com/'])
  148. # Test cookie header is added
  149. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  150. self.assertEqual(
  151. downloader._make_cmd('test', TEST_INFO),
  152. ['http', '--download', '--output', 'test',
  153. 'http://www.example.com/', 'Cookie:' + TEST_COOKIE_VALUE])
  154. @ifExternalFDAvailable(AxelFD)
  155. class TestAxelFD(unittest.TestCase):
  156. def test_make_cmd(self):
  157. with FakeYDL() as ydl:
  158. downloader = AxelFD(ydl, {})
  159. self.assertEqual(
  160. downloader._make_cmd('test', TEST_INFO),
  161. ['axel', '-o', 'test', '--', 'http://www.example.com/'])
  162. # Test cookie header is added
  163. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  164. self.assertEqual(
  165. downloader._make_cmd('test', TEST_INFO),
  166. ['axel', '-o', 'test', '-H', 'Cookie: ' + TEST_COOKIE_VALUE,
  167. '--max-redirect=0', '--', 'http://www.example.com/'])
  168. @ifExternalFDAvailable(WgetFD)
  169. class TestWgetFD(unittest.TestCase):
  170. def test_make_cmd(self):
  171. with FakeYDL() as ydl:
  172. downloader = WgetFD(ydl, {})
  173. self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
  174. # Test cookiejar tempfile arg is added
  175. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  176. self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))
  177. @ifExternalFDAvailable(CurlFD)
  178. class TestCurlFD(unittest.TestCase):
  179. def test_make_cmd(self):
  180. with FakeYDL() as ydl:
  181. downloader = CurlFD(ydl, {})
  182. self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))
  183. # Test cookie header is added
  184. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  185. self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))
  186. self.assertIn(TEST_COOKIE_VALUE, downloader._make_cmd('test', TEST_INFO))
  187. @ifExternalFDAvailable(Aria2cFD)
  188. class TestAria2cFD(unittest.TestCase):
  189. def test_make_cmd(self):
  190. with FakeYDL() as ydl:
  191. downloader = Aria2cFD(ydl, {})
  192. downloader._make_cmd('test', TEST_INFO)
  193. self.assertFalse(hasattr(downloader, '_cookies_tempfile'))
  194. # Test cookiejar tempfile arg is added
  195. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  196. cmd = downloader._make_cmd('test', TEST_INFO)
  197. self.assertIn('--load-cookies=%s' % downloader._cookies_tempfile, cmd)
  198. # Handle delegated availability
  199. def ifFFmpegFDAvailable(externalFD):
  200. # raise SkipTest, or set False!
  201. avail = ifExternalFDAvailable(externalFD) and False
  202. with compat_contextlib_suppress(Exception):
  203. avail = FFmpegPostProcessor(downloader=None).available
  204. return unittest.skipUnless(
  205. avail, externalFD.get_basename() + ' not found')
  206. @ifFFmpegFDAvailable(FFmpegFD)
  207. class TestFFmpegFD(unittest.TestCase):
  208. _args = []
  209. def _test_cmd(self, args):
  210. self._args = args
  211. def test_make_cmd(self):
  212. with FakeYDL() as ydl:
  213. downloader = FFmpegFD(ydl, {})
  214. downloader._debug_cmd = self._test_cmd
  215. info_dict = TEST_INFO.copy()
  216. info_dict['ext'] = 'mp4'
  217. downloader._call_downloader('test', info_dict)
  218. self.assertEqual(self._args, [
  219. 'ffmpeg', '-y', '-i', 'http://www.example.com/',
  220. '-c', 'copy', '-f', 'mp4', 'file:test'])
  221. # Test cookies arg is added
  222. ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))
  223. downloader._call_downloader('test', info_dict)
  224. self.assertEqual(self._args, [
  225. 'ffmpeg', '-y', '-cookies', TEST_COOKIE_VALUE + '; path=/; domain=.example.com;\r\n',
  226. '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])
  227. if __name__ == '__main__':
  228. unittest.main()