IPTV_speed.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import re
  2. import requests
  3. import concurrent.futures
  4. def test_speed(channel_name, channel_url):
  5. try:
  6. response = requests.get(channel_url, timeout=2)
  7. if response.status_code == 200:
  8. speed = response.elapsed.total_seconds()
  9. return channel_name, channel_url, f"{speed:.3f} seconds"
  10. else:
  11. return channel_name, channel_url, "Failed"
  12. except:
  13. return channel_name, channel_url, "Failed"
  14. def channel_key(channel):
  15. match = re.search(r'\d+', channel)
  16. if match:
  17. return int(match.group())
  18. else:
  19. return float('inf') # 返回一个无穷大的数字作为关键字
  20. channels = []
  21. with open("IPTV.txt", 'r', encoding='utf-8') as file:
  22. lines = file.readlines()
  23. for line in lines:
  24. line = line.strip()
  25. if line:
  26. if 'rtp' in line or 'udp' in line:
  27. pass
  28. else:
  29. channel_name, channel_url = line.split(',')
  30. channels.append((channel_name, channel_url))
  31. with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
  32. futures = []
  33. for channel in channels:
  34. channel_name, channel_url = channel
  35. futures.append(executor.submit(test_speed, channel_name, channel_url))
  36. results = []
  37. for future in concurrent.futures.as_completed(futures):
  38. result = future.result()
  39. results.append(result)
  40. results.sort(key=lambda x: (x[0], x[2]))
  41. with open("speed_results.txt", 'w', encoding='utf-8') as file:
  42. for result in results:
  43. channel_name, channel_url, speed = result
  44. file.write(f"{channel_name},{channel_url},{speed}\n")
  45. channels = []
  46. with open("speed_results.txt", 'r', encoding='utf-8') as file:
  47. for line in file:
  48. line = line.strip()
  49. if line:
  50. channel_name, channel_url, speed = line.split(',')
  51. if speed != "Failed":
  52. channels.append((channel_name,channel_url))
  53. # 对频道进行排序
  54. channels.sort(key=lambda x: channel_key(x[0]))
  55. with open("IPTV_speed.txt", 'w', encoding='utf-8') as file:
  56. for channel_name,channel_url in channels:
  57. file.write(f'{channel_name},{channel_url}\n')