speed.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. import asyncio
  2. import http.cookies
  3. import json
  4. import re
  5. import subprocess
  6. from time import time
  7. from urllib.parse import quote, urlparse
  8. import m3u8
  9. from aiohttp import ClientSession, TCPConnector
  10. from multidict import CIMultiDictProxy
  11. import utils.constants as constants
  12. from utils.config import config
  13. from utils.tools import is_ipv6, remove_cache_info, get_resolution_value
  14. http.cookies._is_legal_key = lambda _: True
  15. async def get_speed_with_download(url: str, session: ClientSession = None, timeout: int = config.sort_timeout) -> dict[
  16. str, float | None]:
  17. """
  18. Get the speed of the url with a total timeout
  19. """
  20. start_time = time()
  21. total_size = 0
  22. total_time = 0
  23. info = {'speed': None, 'delay': None}
  24. if session is None:
  25. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  26. created_session = True
  27. else:
  28. created_session = False
  29. try:
  30. async with session.get(url, timeout=timeout) as response:
  31. if response.status != 200:
  32. raise Exception("Invalid response")
  33. info['delay'] = int(round((time() - start_time) * 1000))
  34. async for chunk in response.content.iter_any():
  35. if chunk:
  36. total_size += len(chunk)
  37. except:
  38. pass
  39. finally:
  40. if total_size > 0:
  41. total_time += time() - start_time
  42. info['speed'] = ((total_size / total_time) if total_time > 0 else 0) / 1024 / 1024
  43. if created_session:
  44. await session.close()
  45. return info
  46. async def get_m3u8_headers(url: str, session: ClientSession = None, timeout: int = 5) -> CIMultiDictProxy[str] | dict[
  47. any, any]:
  48. """
  49. Get the headers of the m3u8 url
  50. """
  51. if session is None:
  52. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  53. created_session = True
  54. else:
  55. created_session = False
  56. headers = {}
  57. try:
  58. async with session.head(url, timeout=timeout) as response:
  59. headers = response.headers
  60. except:
  61. pass
  62. finally:
  63. if created_session:
  64. await session.close()
  65. return headers
  66. def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool:
  67. """
  68. Check if the m3u8 url is valid
  69. """
  70. content_type = headers.get('Content-Type', '').lower()
  71. if not content_type:
  72. return False
  73. return any(item in content_type for item in ['application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl'])
  74. async def get_speed_m3u8(url: str, filter_resolution: bool = config.open_filter_resolution,
  75. timeout: int = config.sort_timeout) -> dict[str, float | None]:
  76. """
  77. Get the speed of the m3u8 url with a total timeout
  78. """
  79. info = {'speed': None, 'delay': None, 'resolution': None}
  80. location = None
  81. try:
  82. url = quote(url, safe=':/?$&=@[]%').partition('$')[0]
  83. async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session:
  84. headers = await get_m3u8_headers(url, session)
  85. location = headers.get('Location')
  86. if location:
  87. info.update(await get_speed_m3u8(location, filter_resolution, timeout))
  88. elif check_m3u8_valid(headers):
  89. m3u8_obj = m3u8.load(url, timeout=2)
  90. playlists = m3u8_obj.data.get('playlists')
  91. segments = m3u8_obj.segments
  92. if not segments and playlists:
  93. parsed_url = urlparse(url)
  94. uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path.rsplit('/', 1)[0]}/{playlists[0].get('uri', '')}"
  95. uri_headers = await get_m3u8_headers(uri, session)
  96. if not check_m3u8_valid(uri_headers):
  97. if uri_headers.get('Content-Length'):
  98. info.update(await get_speed_with_download(uri, session, timeout))
  99. raise Exception("Invalid m3u8")
  100. m3u8_obj = m3u8.load(uri, timeout=2)
  101. segments = m3u8_obj.segments
  102. if not segments:
  103. raise Exception("Segments not found")
  104. ts_urls = [segment.absolute_uri for segment in segments]
  105. speed_list = []
  106. start_time = time()
  107. for ts_url in ts_urls:
  108. if time() - start_time > timeout:
  109. break
  110. download_info = await get_speed_with_download(ts_url, session, timeout)
  111. speed_list.append(download_info['speed'])
  112. if info['delay'] is None and download_info['delay'] is not None:
  113. info['delay'] = download_info['delay']
  114. info['speed'] = (sum(speed_list) / len(speed_list)) if speed_list else 0
  115. elif headers.get('Content-Length'):
  116. info.update(await get_speed_with_download(url, session, timeout))
  117. except:
  118. pass
  119. finally:
  120. if filter_resolution and not location and info['delay'] is not None:
  121. info['resolution'] = await get_resolution_ffprobe(url, timeout)
  122. return info
  123. async def get_delay_requests(url, timeout=config.sort_timeout, proxy=None):
  124. """
  125. Get the delay of the url by requests
  126. """
  127. async with ClientSession(
  128. connector=TCPConnector(ssl=False), trust_env=True
  129. ) as session:
  130. start = time()
  131. end = None
  132. try:
  133. async with session.get(url, timeout=timeout, proxy=proxy) as response:
  134. if response.status == 404:
  135. return -1
  136. content = await response.read()
  137. if content:
  138. end = time()
  139. else:
  140. return -1
  141. except Exception as e:
  142. return -1
  143. return int(round((end - start) * 1000)) if end else -1
  144. def check_ffmpeg_installed_status():
  145. """
  146. Check ffmpeg is installed
  147. """
  148. status = False
  149. try:
  150. result = subprocess.run(
  151. ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
  152. )
  153. status = result.returncode == 0
  154. except FileNotFoundError:
  155. status = False
  156. except Exception as e:
  157. print(e)
  158. finally:
  159. return status
  160. async def ffmpeg_url(url, timeout=config.sort_timeout):
  161. """
  162. Get url info by ffmpeg
  163. """
  164. args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"]
  165. proc = None
  166. res = None
  167. try:
  168. proc = await asyncio.create_subprocess_exec(
  169. *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
  170. )
  171. out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2)
  172. if out:
  173. res = out.decode("utf-8")
  174. if err:
  175. res = err.decode("utf-8")
  176. return None
  177. except asyncio.TimeoutError:
  178. if proc:
  179. proc.kill()
  180. return None
  181. except Exception:
  182. if proc:
  183. proc.kill()
  184. return None
  185. finally:
  186. if proc:
  187. await proc.wait()
  188. return res
  189. async def get_resolution_ffprobe(url: str, timeout: int = config.sort_timeout) -> str | None:
  190. """
  191. Get the resolution of the url by ffprobe
  192. """
  193. resolution = None
  194. proc = None
  195. try:
  196. probe_args = [
  197. 'ffprobe',
  198. '-v', 'error',
  199. '-select_streams', 'v:0',
  200. '-show_entries', 'stream=width,height',
  201. "-of", 'json',
  202. url
  203. ]
  204. proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE,
  205. stderr=asyncio.subprocess.PIPE)
  206. out, _ = await asyncio.wait_for(proc.communicate(), timeout)
  207. video_stream = json.loads(out.decode('utf-8'))["streams"][0]
  208. resolution = f"{video_stream['width']}x{video_stream['height']}"
  209. except:
  210. if proc:
  211. proc.kill()
  212. finally:
  213. if proc:
  214. await proc.wait()
  215. return resolution
  216. def get_video_info(video_info):
  217. """
  218. Get the video info
  219. """
  220. frame_size = -1
  221. resolution = None
  222. if video_info is not None:
  223. info_data = video_info.replace(" ", "")
  224. matches = re.findall(r"frame=(\d+)", info_data)
  225. if matches:
  226. frame_size = int(matches[-1])
  227. match = re.search(r"(\d{3,4}x\d{3,4})", video_info)
  228. if match:
  229. resolution = match.group(0)
  230. return frame_size, resolution
  231. async def check_stream_delay(url_info):
  232. """
  233. Check the stream delay
  234. """
  235. try:
  236. url = url_info[0]
  237. video_info = await ffmpeg_url(url)
  238. if video_info is None:
  239. return -1
  240. frame, resolution = get_video_info(video_info)
  241. if frame is None or frame == -1:
  242. return -1
  243. url_info[2] = resolution
  244. return url_info, frame
  245. except Exception as e:
  246. print(e)
  247. return -1
  248. cache = {}
  249. async def get_speed(url, ipv6_proxy=None, filter_resolution=config.open_filter_resolution,
  250. min_resolution=config.min_resolution_value, timeout=config.sort_timeout,
  251. callback=None):
  252. """
  253. Get the speed (response time and resolution) of the url
  254. """
  255. data = {'speed': None, 'delay': None, 'resolution': None}
  256. try:
  257. cache_key = None
  258. url_is_ipv6 = is_ipv6(url)
  259. if "$" in url:
  260. url, _, cache_info = url.partition("$")
  261. matcher = re.search(r"cache:(.*)", cache_info)
  262. if matcher:
  263. cache_key = matcher.group(1)
  264. if cache_key in cache:
  265. cache_list = cache[cache_key]
  266. for cache_item in cache_list:
  267. if cache_item['speed'] > 0 and cache_item['delay'] != -1 and get_resolution_value(
  268. cache_item['resolution']) > min_resolution:
  269. return cache_item
  270. if ipv6_proxy and url_is_ipv6:
  271. data['speed'] = float("inf")
  272. data['delay'] = 0
  273. data['resolution'] = "1920x1080"
  274. elif re.match(constants.rtmp_url_pattern, url) is not None:
  275. start_time = time()
  276. data['resolution'] = await get_resolution_ffprobe(url, timeout)
  277. data['delay'] = int(round((time() - start_time) * 1000))
  278. data['speed'] = float("inf") if data['resolution'] is not None else 0
  279. else:
  280. data.update(await get_speed_m3u8(url, filter_resolution, timeout))
  281. if cache_key:
  282. cache.setdefault(cache_key, []).append(data)
  283. return data
  284. except:
  285. return data
  286. finally:
  287. if callback:
  288. callback()
  289. def sort_urls_key(item):
  290. """
  291. Sort the urls with key
  292. """
  293. speed, resolution, origin = item["speed"], item["resolution"], item["origin"]
  294. if origin == "whitelist":
  295. return float("inf")
  296. else:
  297. return speed + get_resolution_value(resolution)
  298. def sort_urls(name, data, supply=config.open_supply, filter_speed=config.open_filter_speed, min_speed=config.min_speed,
  299. filter_resolution=config.open_filter_resolution, min_resolution=config.min_resolution_value,
  300. logger=None):
  301. """
  302. Sort the urls with info
  303. """
  304. filter_data = []
  305. for url, date, resolution, origin in data:
  306. result = {
  307. "url": remove_cache_info(url),
  308. "date": date,
  309. "delay": None,
  310. "speed": None,
  311. "resolution": resolution,
  312. "origin": origin
  313. }
  314. if origin == "whitelist":
  315. filter_data.append(result)
  316. continue
  317. cache_key_match = re.search(r"cache:(.*)", url.partition("$")[2])
  318. cache_key = cache_key_match.group(1) if cache_key_match else None
  319. if cache_key and cache_key in cache:
  320. cache_list = cache[cache_key]
  321. if cache_list:
  322. avg_speed = sum(item['speed'] or 0 for item in cache_list) / len(cache_list)
  323. avg_delay = max(int(sum(item['delay'] or -1 for item in cache_list) / len(cache_list)), -1)
  324. resolution = max((item['resolution'] for item in cache_list), key=get_resolution_value) or resolution
  325. try:
  326. if logger:
  327. logger.info(
  328. f"Name: {name}, URL: {result["url"]}, Date: {date}, Delay: {avg_delay} ms, Speed: {avg_speed:.2f} M/s, Resolution: {resolution}"
  329. )
  330. except Exception as e:
  331. print(e)
  332. if (not supply and filter_speed and avg_speed < min_speed) or (
  333. not supply and filter_resolution and get_resolution_value(resolution) < min_resolution) or (
  334. supply and avg_delay < 0):
  335. continue
  336. result["delay"] = avg_delay
  337. result["speed"] = avg_speed
  338. result["resolution"] = resolution
  339. filter_data.append(result)
  340. filter_data.sort(key=sort_urls_key, reverse=True)
  341. return [
  342. (item["url"], item["date"], item["resolution"], item["origin"])
  343. for item in filter_data
  344. ]