tools.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. import datetime
  2. import ipaddress
  3. import json
  4. import logging
  5. import os
  6. import re
  7. import shutil
  8. import socket
  9. import sys
  10. import urllib.parse
  11. from collections import defaultdict
  12. from logging.handlers import RotatingFileHandler
  13. from time import time
  14. import pytz
  15. import requests
  16. from bs4 import BeautifulSoup
  17. from flask import send_file, make_response
  18. import utils.constants as constants
  19. from utils.config import config
  20. def get_logger(path, level=logging.ERROR, init=False):
  21. """
  22. get the logger
  23. """
  24. if not os.path.exists(constants.output_path):
  25. os.makedirs(constants.output_path)
  26. if init and os.path.exists(path):
  27. os.remove(path)
  28. handler = RotatingFileHandler(path, encoding="utf-8")
  29. logger = logging.getLogger(path)
  30. logger.addHandler(handler)
  31. logger.setLevel(level)
  32. return logger
  33. def format_interval(t):
  34. """
  35. Formats a number of seconds as a clock time, [H:]MM:SS
  36. Parameters
  37. ----------
  38. t : int or float
  39. Number of seconds.
  40. Returns
  41. -------
  42. out : str
  43. [H:]MM:SS
  44. """
  45. mins, s = divmod(int(t), 60)
  46. h, m = divmod(mins, 60)
  47. if h:
  48. return "{0:d}:{1:02d}:{2:02d}".format(h, m, s)
  49. else:
  50. return "{0:02d}:{1:02d}".format(m, s)
  51. def get_pbar_remaining(n=0, total=0, start_time=None):
  52. """
  53. Get the remaining time of the progress bar
  54. """
  55. try:
  56. elapsed = time() - start_time
  57. completed_tasks = n
  58. if completed_tasks > 0:
  59. avg_time_per_task = elapsed / completed_tasks
  60. remaining_tasks = total - completed_tasks
  61. remaining_time = format_interval(avg_time_per_task * remaining_tasks)
  62. else:
  63. remaining_time = "未知"
  64. return remaining_time
  65. except Exception as e:
  66. print(f"Error: {e}")
  67. def update_file(final_file, old_file, copy=False):
  68. """
  69. Update the file
  70. """
  71. old_file_path = resource_path(old_file, persistent=True)
  72. final_file_path = resource_path(final_file, persistent=True)
  73. if os.path.exists(old_file_path):
  74. if copy:
  75. shutil.copyfile(old_file_path, final_file_path)
  76. else:
  77. os.replace(old_file_path, final_file_path)
  78. def filter_by_date(data):
  79. """
  80. Filter by date and limit
  81. """
  82. default_recent_days = 30
  83. use_recent_days = config.recent_days
  84. if not isinstance(use_recent_days, int) or use_recent_days <= 0:
  85. use_recent_days = default_recent_days
  86. start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
  87. recent_data = []
  88. unrecent_data = []
  89. for (url, date, resolution, origin), response_time in data:
  90. item = ((url, date, resolution, origin), response_time)
  91. if date:
  92. date = datetime.datetime.strptime(date, "%m-%d-%Y")
  93. if date >= start_date:
  94. recent_data.append(item)
  95. else:
  96. unrecent_data.append(item)
  97. else:
  98. unrecent_data.append(item)
  99. recent_data_len = len(recent_data)
  100. if recent_data_len == 0:
  101. recent_data = unrecent_data
  102. elif recent_data_len < config.urls_limit:
  103. recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
  104. return recent_data
  105. def get_soup(source):
  106. """
  107. Get soup from source
  108. """
  109. source = re.sub(
  110. r"<!--.*?-->",
  111. "",
  112. source,
  113. flags=re.DOTALL,
  114. )
  115. soup = BeautifulSoup(source, "html.parser")
  116. return soup
  117. def get_resolution_value(resolution_str):
  118. """
  119. Get resolution value from string
  120. """
  121. try:
  122. if resolution_str:
  123. pattern = r"(\d+)[xX*](\d+)"
  124. match = re.search(pattern, resolution_str)
  125. if match:
  126. width, height = map(int, match.groups())
  127. return width * height
  128. except:
  129. pass
  130. return 0
  131. def get_total_urls(info_list, ipv_type_prefer, origin_type_prefer):
  132. """
  133. Get the total urls from info list
  134. """
  135. ipv_prefer_bool = bool(ipv_type_prefer)
  136. origin_prefer_bool = bool(origin_type_prefer)
  137. if not ipv_prefer_bool:
  138. ipv_type_prefer = ["all"]
  139. if not origin_prefer_bool:
  140. origin_type_prefer = ["all"]
  141. categorized_urls = {origin: {ipv_type: [] for ipv_type in ipv_type_prefer} for origin in origin_type_prefer}
  142. total_urls = []
  143. for url, _, resolution, origin in info_list:
  144. if not origin:
  145. continue
  146. if origin == "whitelist":
  147. w_url, _, w_info = url.partition("$")
  148. w_info_value = w_info.partition("!")[2] or "白名单"
  149. total_urls.append(add_url_info(w_url, w_info_value))
  150. continue
  151. if origin == "subscribe" and "/rtp/" in url:
  152. origin = "multicast"
  153. if origin_prefer_bool and (origin not in origin_type_prefer):
  154. continue
  155. pure_url, _, info = url.partition("$")
  156. if not info:
  157. origin_name = constants.origin_map[origin]
  158. if origin_name:
  159. url = add_url_info(pure_url, origin_name)
  160. url_is_ipv6 = is_ipv6(url)
  161. if url_is_ipv6:
  162. url = add_url_info(url, "IPv6")
  163. if resolution:
  164. url = add_url_info(url, resolution)
  165. if not origin_prefer_bool:
  166. origin = "all"
  167. if ipv_prefer_bool:
  168. key = "ipv6" if url_is_ipv6 else "ipv4"
  169. if key in ipv_type_prefer:
  170. categorized_urls[origin][key].append(url)
  171. else:
  172. categorized_urls[origin]["all"].append(url)
  173. ipv_num = {ipv_type: 0 for ipv_type in ipv_type_prefer}
  174. urls_limit = config.urls_limit
  175. for origin in origin_type_prefer:
  176. if len(total_urls) >= urls_limit:
  177. break
  178. for ipv_type in ipv_type_prefer:
  179. if len(total_urls) >= urls_limit:
  180. break
  181. ipv_type_num = ipv_num[ipv_type]
  182. ipv_type_limit = config.ipv_limit[ipv_type] or urls_limit
  183. if ipv_type_num < ipv_type_limit:
  184. urls = categorized_urls[origin][ipv_type]
  185. if not urls:
  186. continue
  187. limit = min(
  188. max(config.source_limits.get(origin, urls_limit) - ipv_type_num, 0),
  189. max(ipv_type_limit - ipv_type_num, 0),
  190. )
  191. limit_urls = urls[:limit]
  192. total_urls.extend(limit_urls)
  193. ipv_num[ipv_type] += len(limit_urls)
  194. else:
  195. continue
  196. total_urls = list(dict.fromkeys(total_urls))[:urls_limit]
  197. if not config.open_url_info:
  198. return [url.partition("$")[0] for url in total_urls]
  199. else:
  200. return total_urls
  201. def get_total_urls_from_sorted_data(data):
  202. """
  203. Get the total urls with filter by date and duplicate from sorted data
  204. """
  205. total_urls = []
  206. if len(data) > config.urls_limit:
  207. total_urls = [url for (url, _, _, _), _ in filter_by_date(data)]
  208. else:
  209. total_urls = [url for (url, _, _, _), _ in data]
  210. return list(dict.fromkeys(total_urls))[: config.urls_limit]
  211. def is_ipv6(url):
  212. """
  213. Check if the url is ipv6
  214. """
  215. try:
  216. host = urllib.parse.urlparse(url).hostname
  217. ipaddress.IPv6Address(host)
  218. return True
  219. # if host:
  220. # addr_info = socket.getaddrinfo(host, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
  221. # for info in addr_info:
  222. # if info[0] == socket.AF_INET6:
  223. # return True
  224. # return False
  225. except:
  226. return False
  227. def check_ipv6_support():
  228. """
  229. Check if the system network supports ipv6
  230. """
  231. url = "https://ipv6.tokyo.test-ipv6.com/ip/?callback=?&testdomain=test-ipv6.com&testname=test_aaaa"
  232. try:
  233. print("Checking if your network supports IPv6...")
  234. response = requests.get(url, timeout=10)
  235. if response.status_code == 200:
  236. print("Your network supports IPv6")
  237. return True
  238. except Exception:
  239. pass
  240. print("Your network does not support IPv6, don't worry, these results will be saved")
  241. return False
  242. def check_url_ipv_type(url):
  243. """
  244. Check if the url is compatible with the ipv type in the config
  245. """
  246. ipv6 = is_ipv6(url)
  247. ipv_type = config.ipv_type
  248. return (
  249. (ipv_type == "ipv4" and not ipv6)
  250. or (ipv_type == "ipv6" and ipv6)
  251. or ipv_type == "全部"
  252. or ipv_type == "all"
  253. )
  254. def check_url_by_keywords(url, keywords=None):
  255. """
  256. Check by URL keywords
  257. """
  258. if not keywords:
  259. return True
  260. else:
  261. return any(keyword in url for keyword in keywords)
  262. def merge_objects(*objects):
  263. """
  264. Merge objects
  265. """
  266. def merge_dicts(dict1, dict2):
  267. for key, value in dict2.items():
  268. if key in dict1:
  269. if isinstance(dict1[key], dict) and isinstance(value, dict):
  270. merge_dicts(dict1[key], value)
  271. elif isinstance(dict1[key], set):
  272. dict1[key].update(value)
  273. elif isinstance(dict1[key], list):
  274. if value:
  275. dict1[key].extend(x for x in value if x not in dict1[key])
  276. elif value:
  277. dict1[key] = {dict1[key], value}
  278. else:
  279. dict1[key] = value
  280. merged_dict = {}
  281. for obj in objects:
  282. if not isinstance(obj, dict):
  283. raise TypeError("All input objects must be dictionaries")
  284. merge_dicts(merged_dict, obj)
  285. return merged_dict
  286. def get_ip_address():
  287. """
  288. Get the IP address
  289. """
  290. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  291. ip = "127.0.0.1"
  292. try:
  293. s.connect(("10.255.255.255", 1))
  294. ip = s.getsockname()[0]
  295. except:
  296. ip = "127.0.0.1"
  297. finally:
  298. s.close()
  299. return f"http://{ip}:{config.app_port}"
  300. def convert_to_m3u(first_channel_name=None):
  301. """
  302. Convert result txt to m3u format
  303. """
  304. user_final_file = resource_path(config.final_file)
  305. if os.path.exists(user_final_file):
  306. with open(user_final_file, "r", encoding="utf-8") as file:
  307. m3u_output = '#EXTM3U x-tvg-url="https://raw.githubusercontent.com/fanmingming/live/main/e.xml"\n'
  308. current_group = None
  309. for line in file:
  310. trimmed_line = line.strip()
  311. if trimmed_line != "":
  312. if "#genre#" in trimmed_line:
  313. current_group = trimmed_line.replace(",#genre#", "").strip()
  314. else:
  315. try:
  316. original_channel_name, _, channel_link = map(
  317. str.strip, trimmed_line.partition(",")
  318. )
  319. except:
  320. continue
  321. processed_channel_name = re.sub(
  322. r"(CCTV|CETV)-(\d+)(\+.*)?",
  323. lambda m: f"{m.group(1)}{m.group(2)}"
  324. + ("+" if m.group(3) else ""),
  325. first_channel_name if current_group == "🕘️更新时间" else original_channel_name,
  326. )
  327. m3u_output += f'#EXTINF:-1 tvg-name="{processed_channel_name}" tvg-logo="https://raw.githubusercontent.com/fanmingming/live/main/tv/{processed_channel_name}.png"'
  328. if current_group:
  329. m3u_output += f' group-title="{current_group}"'
  330. m3u_output += f",{original_channel_name}\n{channel_link}\n"
  331. m3u_file_path = os.path.splitext(user_final_file)[0] + ".m3u"
  332. with open(m3u_file_path, "w", encoding="utf-8") as m3u_file:
  333. m3u_file.write(m3u_output)
  334. print(f"✅ M3U result file generated at: {m3u_file_path}")
  335. def get_result_file_content(show_content=False, file_type=None):
  336. """
  337. Get the content of the result file
  338. """
  339. user_final_file = resource_path(config.final_file)
  340. result_file = (
  341. os.path.splitext(user_final_file)[0] + f".{file_type}"
  342. if file_type
  343. else user_final_file
  344. )
  345. if os.path.exists(result_file):
  346. if config.open_m3u_result:
  347. if file_type == "m3u" or not file_type:
  348. result_file = os.path.splitext(user_final_file)[0] + ".m3u"
  349. if file_type != "txt" and show_content == False:
  350. return send_file(result_file, as_attachment=True)
  351. with open(result_file, "r", encoding="utf-8") as file:
  352. content = file.read()
  353. else:
  354. content = constants.waiting_tip
  355. response = make_response(content)
  356. response.mimetype = 'text/plain'
  357. return response
  358. def remove_duplicates_from_tuple_list(tuple_list, seen, flag=None, force_str=None):
  359. """
  360. Remove duplicates from tuple list
  361. """
  362. unique_list = []
  363. for item in tuple_list:
  364. item_first = item[0]
  365. part = item_first
  366. if force_str:
  367. info = item_first.partition("$")[2]
  368. if info and info.startswith(force_str):
  369. continue
  370. if flag:
  371. matcher = re.search(flag, item_first)
  372. if matcher:
  373. part = matcher.group(1)
  374. seen_num = seen.get(part, 0)
  375. if (seen_num < config.sort_duplicate_limit) or (seen_num == 0 and config.sort_duplicate_limit == 0):
  376. seen[part] = seen_num + 1
  377. unique_list.append(item)
  378. return unique_list
  379. def process_nested_dict(data, seen, flag=None, force_str=None):
  380. """
  381. Process nested dict
  382. """
  383. for key, value in data.items():
  384. if isinstance(value, dict):
  385. process_nested_dict(value, seen, flag, force_str)
  386. elif isinstance(value, list):
  387. data[key] = remove_duplicates_from_tuple_list(value, seen, flag, force_str)
  388. url_host_compile = re.compile(
  389. constants.url_host_pattern
  390. )
  391. def get_url_host(url):
  392. """
  393. Get the url host
  394. """
  395. matcher = url_host_compile.search(url)
  396. if matcher:
  397. return matcher.group()
  398. return None
  399. def add_url_info(url, info):
  400. """
  401. Add url info to the URL
  402. """
  403. if info:
  404. separator = "-" if "$" in url else "$"
  405. url += f"{separator}{info}"
  406. return url
  407. def format_url_with_cache(url, cache=None):
  408. """
  409. Format the URL with cache
  410. """
  411. cache = cache or get_url_host(url) or ""
  412. return add_url_info(url, f"cache:{cache}") if cache else url
  413. def remove_cache_info(string):
  414. """
  415. Remove the cache info from the string
  416. """
  417. return re.sub(r"[.*]?\$?-?cache:.*", "", string)
  418. def resource_path(relative_path, persistent=False):
  419. """
  420. Get the resource path
  421. """
  422. base_path = os.path.abspath(".")
  423. total_path = os.path.join(base_path, relative_path)
  424. if persistent or os.path.exists(total_path):
  425. return total_path
  426. else:
  427. try:
  428. base_path = sys._MEIPASS
  429. return os.path.join(base_path, relative_path)
  430. except Exception:
  431. return total_path
  432. def write_content_into_txt(content, path=None, position=None, callback=None):
  433. """
  434. Write content into txt file
  435. """
  436. if not path:
  437. return
  438. mode = "r+" if position == "top" else "a"
  439. with open(path, mode, encoding="utf-8") as f:
  440. if position == "top":
  441. existing_content = f.read()
  442. f.seek(0, 0)
  443. f.write(f"{content}\n{existing_content}")
  444. else:
  445. f.write(content)
  446. if callback:
  447. callback()
  448. def get_name_url(content, pattern, multiline=False, check_url=True):
  449. """
  450. Get name and url from content
  451. """
  452. flag = re.MULTILINE if multiline else 0
  453. matches = re.findall(pattern, content, flag)
  454. channels = [
  455. {"name": match[0].strip(), "url": match[1].strip()}
  456. for match in matches
  457. if (check_url and match[1].strip()) or not check_url
  458. ]
  459. return channels
  460. def get_real_path(path) -> str:
  461. """
  462. Get the real path
  463. """
  464. dir_path, file = os.path.split(path)
  465. user_real_path = os.path.join(dir_path, 'user_' + file)
  466. real_path = user_real_path if os.path.exists(user_real_path) else path
  467. return real_path
  468. def get_urls_from_file(path: str) -> list:
  469. """
  470. Get the urls from file
  471. """
  472. real_path = get_real_path(resource_path(path))
  473. urls = []
  474. url_pattern = constants.url_pattern
  475. if os.path.exists(real_path):
  476. with open(real_path, "r", encoding="utf-8") as f:
  477. for line in f:
  478. line = line.strip()
  479. if line.startswith("#"):
  480. continue
  481. match = re.search(url_pattern, line)
  482. if match:
  483. urls.append(match.group().strip())
  484. return urls
  485. def get_name_urls_from_file(path: str) -> dict[str, list]:
  486. """
  487. Get the name and urls from file
  488. """
  489. real_path = get_real_path(resource_path(path))
  490. name_urls = defaultdict(list)
  491. txt_pattern = constants.txt_pattern
  492. if os.path.exists(real_path):
  493. with open(real_path, "r", encoding="utf-8") as f:
  494. for line in f:
  495. line = line.strip()
  496. if line.startswith("#"):
  497. continue
  498. name_url = get_name_url(line, pattern=txt_pattern)
  499. if name_url and name_url[0]:
  500. name = name_url[0]["name"]
  501. url = name_url[0]["url"]
  502. if url not in name_urls[name]:
  503. name_urls[name].append(url)
  504. return name_urls
  505. def get_datetime_now():
  506. """
  507. Get the datetime now
  508. """
  509. now = datetime.datetime.now()
  510. time_zone = pytz.timezone(config.time_zone)
  511. return now.astimezone(time_zone).strftime("%Y-%m-%d %H:%M:%S")
  512. def get_version_info():
  513. """
  514. Get the version info
  515. """
  516. with open(resource_path("version.json"), "r", encoding="utf-8") as f:
  517. return json.load(f)