speed.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. import asyncio
  2. import http.cookies
  3. import json
  4. import re
  5. import subprocess
  6. from time import time
  7. from urllib.parse import quote, urlparse
  8. import m3u8
  9. from aiohttp import ClientSession, TCPConnector
  10. from multidict import CIMultiDictProxy
  11. import utils.constants as constants
  12. from utils.config import config
  13. from utils.tools import remove_cache_info, get_resolution_value
  14. from utils.types import TestResult, ChannelTestResult, TestResultCacheData
  15. http.cookies._is_legal_key = lambda _: True
  16. cache: TestResultCacheData = {}
  17. async def get_speed_with_download(url: str, session: ClientSession = None, timeout: int = config.sort_timeout) -> dict[
  18. str, float | None]:
  19. """
  20. Get the speed of the url with a total timeout
  21. """
  22. start_time = time()
  23. total_size = 0
  24. total_time = 0
  25. info = {'speed': None, 'delay': None}
  26. if session is None:
  27. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  28. created_session = True
  29. else:
  30. created_session = False
  31. try:
  32. async with session.get(url, timeout=timeout) as response:
  33. if response.status != 200:
  34. raise Exception("Invalid response")
  35. info['delay'] = int(round((time() - start_time) * 1000))
  36. async for chunk in response.content.iter_any():
  37. if chunk:
  38. total_size += len(chunk)
  39. except:
  40. pass
  41. finally:
  42. if total_size > 0:
  43. total_time += time() - start_time
  44. info['speed'] = ((total_size / total_time) if total_time > 0 else 0) / 1024 / 1024
  45. if created_session:
  46. await session.close()
  47. return info
  48. async def get_m3u8_headers(url: str, session: ClientSession = None, timeout: int = 5) -> CIMultiDictProxy[str] | dict[
  49. any, any]:
  50. """
  51. Get the headers of the m3u8 url
  52. """
  53. if session is None:
  54. session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True)
  55. created_session = True
  56. else:
  57. created_session = False
  58. headers = {}
  59. try:
  60. async with session.head(url, timeout=timeout) as response:
  61. headers = response.headers
  62. except:
  63. pass
  64. finally:
  65. if created_session:
  66. await session.close()
  67. return headers
  68. def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool:
  69. """
  70. Check if the m3u8 url is valid
  71. """
  72. content_type = headers.get('Content-Type', '').lower()
  73. if not content_type:
  74. return False
  75. return any(item in content_type for item in ['application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl'])
  76. async def get_speed_m3u8(url: str, resolution: str = None, filter_resolution: bool = config.open_filter_resolution,
  77. timeout: int = config.sort_timeout) -> dict[str, float | None]:
  78. """
  79. Get the speed of the m3u8 url with a total timeout
  80. """
  81. info = {'speed': None, 'delay': None, 'resolution': resolution}
  82. location = None
  83. try:
  84. url = quote(url, safe=':/?$&=@[]%').partition('$')[0]
  85. async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session:
  86. headers = await get_m3u8_headers(url, session)
  87. location = headers.get('Location')
  88. if location:
  89. info.update(await get_speed_m3u8(location, resolution, filter_resolution, timeout))
  90. elif check_m3u8_valid(headers):
  91. m3u8_obj = m3u8.load(url, timeout=2)
  92. playlists = m3u8_obj.data.get('playlists')
  93. segments = m3u8_obj.segments
  94. if not segments and playlists:
  95. parsed_url = urlparse(url)
  96. uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path.rsplit('/', 1)[0]}/{playlists[0].get('uri', '')}"
  97. uri_headers = await get_m3u8_headers(uri, session)
  98. if not check_m3u8_valid(uri_headers):
  99. if uri_headers.get('Content-Length'):
  100. info.update(await get_speed_with_download(uri, session, timeout))
  101. raise Exception("Invalid m3u8")
  102. m3u8_obj = m3u8.load(uri, timeout=2)
  103. segments = m3u8_obj.segments
  104. if not segments:
  105. raise Exception("Segments not found")
  106. ts_urls = [segment.absolute_uri for segment in segments]
  107. speed_list = []
  108. start_time = time()
  109. for ts_url in ts_urls:
  110. if time() - start_time > timeout:
  111. break
  112. download_info = await get_speed_with_download(ts_url, session, timeout)
  113. speed_list.append(download_info['speed'])
  114. if info['delay'] is None and download_info['delay'] is not None:
  115. info['delay'] = download_info['delay']
  116. info['speed'] = (sum(speed_list) / len(speed_list)) if speed_list else 0
  117. elif headers.get('Content-Length'):
  118. info.update(await get_speed_with_download(url, session, timeout))
  119. except:
  120. pass
  121. finally:
  122. if not resolution and filter_resolution and not location and info['delay'] is not None:
  123. info['resolution'] = await get_resolution_ffprobe(url, timeout)
  124. return info
  125. async def get_delay_requests(url, timeout=config.sort_timeout, proxy=None):
  126. """
  127. Get the delay of the url by requests
  128. """
  129. async with ClientSession(
  130. connector=TCPConnector(ssl=False), trust_env=True
  131. ) as session:
  132. start = time()
  133. end = None
  134. try:
  135. async with session.get(url, timeout=timeout, proxy=proxy) as response:
  136. if response.status == 404:
  137. return -1
  138. content = await response.read()
  139. if content:
  140. end = time()
  141. else:
  142. return -1
  143. except Exception as e:
  144. return -1
  145. return int(round((end - start) * 1000)) if end else -1
  146. def check_ffmpeg_installed_status():
  147. """
  148. Check ffmpeg is installed
  149. """
  150. status = False
  151. try:
  152. result = subprocess.run(
  153. ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
  154. )
  155. status = result.returncode == 0
  156. except FileNotFoundError:
  157. status = False
  158. except Exception as e:
  159. print(e)
  160. finally:
  161. return status
  162. async def ffmpeg_url(url, timeout=config.sort_timeout):
  163. """
  164. Get url info by ffmpeg
  165. """
  166. args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"]
  167. proc = None
  168. res = None
  169. try:
  170. proc = await asyncio.create_subprocess_exec(
  171. *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
  172. )
  173. out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2)
  174. if out:
  175. res = out.decode("utf-8")
  176. if err:
  177. res = err.decode("utf-8")
  178. return None
  179. except asyncio.TimeoutError:
  180. if proc:
  181. proc.kill()
  182. return None
  183. except Exception:
  184. if proc:
  185. proc.kill()
  186. return None
  187. finally:
  188. if proc:
  189. await proc.wait()
  190. return res
  191. async def get_resolution_ffprobe(url: str, timeout: int = config.sort_timeout) -> str | None:
  192. """
  193. Get the resolution of the url by ffprobe
  194. """
  195. resolution = None
  196. proc = None
  197. try:
  198. probe_args = [
  199. 'ffprobe',
  200. '-v', 'error',
  201. '-select_streams', 'v:0',
  202. '-show_entries', 'stream=width,height',
  203. "-of", 'json',
  204. url
  205. ]
  206. proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE,
  207. stderr=asyncio.subprocess.PIPE)
  208. out, _ = await asyncio.wait_for(proc.communicate(), timeout)
  209. video_stream = json.loads(out.decode('utf-8'))["streams"][0]
  210. resolution = f"{video_stream['width']}x{video_stream['height']}"
  211. except:
  212. if proc:
  213. proc.kill()
  214. finally:
  215. if proc:
  216. await proc.wait()
  217. return resolution
  218. def get_video_info(video_info):
  219. """
  220. Get the video info
  221. """
  222. frame_size = -1
  223. resolution = None
  224. if video_info is not None:
  225. info_data = video_info.replace(" ", "")
  226. matches = re.findall(r"frame=(\d+)", info_data)
  227. if matches:
  228. frame_size = int(matches[-1])
  229. match = re.search(r"(\d{3,4}x\d{3,4})", video_info)
  230. if match:
  231. resolution = match.group(0)
  232. return frame_size, resolution
  233. async def check_stream_delay(url_info):
  234. """
  235. Check the stream delay
  236. """
  237. try:
  238. url = url_info["url"]
  239. video_info = await ffmpeg_url(url)
  240. if video_info is None:
  241. return -1
  242. frame, resolution = get_video_info(video_info)
  243. if frame is None or frame == -1:
  244. return -1
  245. url_info["resolution"] = resolution
  246. return url_info, frame
  247. except Exception as e:
  248. print(e)
  249. return -1
  250. async def get_speed(url, is_ipv6=False, ipv6_proxy=None, resolution=None,
  251. filter_resolution=config.open_filter_resolution,
  252. min_resolution=config.min_resolution_value, timeout=config.sort_timeout,
  253. callback=None) -> TestResult:
  254. """
  255. Get the speed (response time and resolution) of the url
  256. """
  257. data: TestResult = {'speed': None, 'delay': None, 'resolution': resolution}
  258. try:
  259. cache_key = None
  260. if "$" in url:
  261. url, _, cache_info = url.partition("$")
  262. matcher = re.search(r"cache:(.*)", cache_info)
  263. if matcher:
  264. cache_key = matcher.group(1)
  265. if cache_key in cache:
  266. cache_list = cache[cache_key]
  267. for cache_item in cache_list:
  268. if cache_item['speed'] > 0 and cache_item['delay'] != -1 and get_resolution_value(
  269. cache_item['resolution']) > min_resolution:
  270. data = cache_item
  271. break
  272. else:
  273. if is_ipv6 and ipv6_proxy:
  274. data['speed'] = float("inf")
  275. data['delay'] = 0
  276. data['resolution'] = "1920x1080"
  277. elif constants.rt_url_pattern.match(url) is not None:
  278. start_time = time()
  279. if not data['resolution'] and filter_resolution:
  280. data['resolution'] = await get_resolution_ffprobe(url, timeout)
  281. data['delay'] = int(round((time() - start_time) * 1000))
  282. data['speed'] = float("inf") if data['resolution'] is not None else 0
  283. else:
  284. data.update(await get_speed_m3u8(url, resolution, filter_resolution, timeout))
  285. if cache_key:
  286. cache.setdefault(cache_key, []).append(data)
  287. finally:
  288. if callback:
  289. callback()
  290. return data
  291. def sort_urls_key(item: ChannelTestResult) -> float:
  292. """
  293. Sort the urls with key
  294. """
  295. speed, resolution, origin = item["speed"], item["resolution"], item["origin"]
  296. if origin == "whitelist":
  297. return float("inf")
  298. else:
  299. return speed + get_resolution_value(resolution)
  300. def sort_urls(name, data, supply=config.open_supply, filter_speed=config.open_filter_speed, min_speed=config.min_speed,
  301. filter_resolution=config.open_filter_resolution, min_resolution=config.min_resolution_value,
  302. logger=None) -> list[ChannelTestResult]:
  303. """
  304. Sort the urls with info
  305. """
  306. filter_data = []
  307. for item in data:
  308. url, date, resolution, origin, ipv_type = item["url"], item["date"], item["resolution"], item["origin"], item[
  309. "ipv_type"]
  310. result: ChannelTestResult = {
  311. "url": remove_cache_info(url),
  312. "date": date,
  313. "delay": None,
  314. "speed": None,
  315. "resolution": resolution,
  316. "origin": origin,
  317. "ipv_type": ipv_type,
  318. }
  319. if origin == "whitelist":
  320. filter_data.append(result)
  321. continue
  322. cache_key_match = re.search(r"cache:(.*)", url.partition("$")[2])
  323. cache_key = cache_key_match.group(1) if cache_key_match else None
  324. if cache_key and cache_key in cache:
  325. cache_list = cache[cache_key]
  326. if cache_list:
  327. avg_speed: int | float | None = sum(item['speed'] or 0 for item in cache_list) / len(cache_list)
  328. avg_delay: int | float | None = max(
  329. int(sum(item['delay'] or -1 for item in cache_list) / len(cache_list)), -1)
  330. resolution = max((item['resolution'] for item in cache_list), key=get_resolution_value) or resolution
  331. try:
  332. if logger:
  333. logger.info(
  334. f"Name: {name}, URL: {result["url"]}, IPv_Type: {ipv_type}, Date: {date}, Delay: {avg_delay} ms, Speed: {avg_speed:.2f} M/s, Resolution: {resolution}"
  335. )
  336. except Exception as e:
  337. print(e)
  338. if (not supply and filter_speed and avg_speed < min_speed) or (
  339. not supply and filter_resolution and get_resolution_value(resolution) < min_resolution) or (
  340. supply and avg_delay < 0):
  341. continue
  342. result["delay"] = avg_delay
  343. result["speed"] = avg_speed
  344. result["resolution"] = resolution
  345. filter_data.append(result)
  346. filter_data.sort(key=sort_urls_key, reverse=True)
  347. return filter_data