123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # File : files.py
- # Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
- # Date : 2022/9/6
- import os
- import shutil
- from utils.system import getHost
- from controllers.service import storage_service
- from utils.encode import base64Encode,parseText
- from flask import render_template_string
- from utils.log import logger
- def getPics(path='images'):
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- img_path = os.path.join(base_path, f'{path}')
- os.makedirs(img_path,exist_ok=True)
- file_name = os.listdir(img_path)
- # file_name = list(filter(lambda x: str(x).endswith('.js') and str(x).find('模板') < 0, file_name))
- # print(file_name)
- pic_list = [img_path+'/'+file for file in file_name]
- # pic_list = file_name
- # print(type(pic_list))
- return pic_list
- def get_live_url(new_conf,mode):
- host = getHost(mode)
- lsg = storage_service()
- # t1 = time()
- # live_url = host + '/lives' if new_conf.get('LIVE_MODE',1) == 0 else lsg.getItem('LIVE_URL',getHost(2)+'/lives')
- live_url = host + '/lives' if lsg.getItem('LIVE_MODE',1) == 0 else lsg.getItem('LIVE_URL',getHost(2)+'/lives')
- live_url = base64Encode(live_url)
- # print(f'{get_interval(t1)}毫秒')
- return live_url
- def getAlist():
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- alist_path = os.path.join(base_path, 'js/alist.conf')
- alist_cpath = os.path.join(base_path, 'base/alist.conf')
- try:
- if not os.path.exists(alist_cpath):
- shutil.copy(alist_path, alist_cpath) # 复制文件
- with open(alist_cpath,encoding='utf-8') as f:
- data = f.read().strip()
- alists = []
- for i in data.split('\n'):
- i = i.strip()
- dt = i.split(',')
- if not i.strip().startswith('#'):
- obj = {
- 'name': dt[0],
- 'server': dt[1],
- 'type':"alist",
- }
- if len(dt) > 2:
- obj.update({
- 'password': dt[2]
- })
- alists.append(obj)
- print(f'共计{len(alists)}条alist记录')
- return alists
- except Exception as e:
- print(f'获取alist列表失败:{e}')
- return []
- def get_jar_list():
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- jar_path = os.path.join(base_path, 'libs/jar')
- if not os.path.exists(jar_path):
- os.makedirs(jar_path, exist_ok=True)
- logger.info(f'初始化{jar_path}目录')
- jars = os.listdir(jar_path)
- jars = list(filter(lambda x: str(x).endswith('.jar') and str(x).find('base') < 0, jars))
- # print(jars)
- # jar_list = [file.replace('.jar', '') for file in jars]
- return jars
- def get_drop_js(jsd_list):
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- js_path = os.path.join(base_path, 'js')
- js_list = [os.path.join(js_path, jsd.replace('jsd','js')) for jsd in jsd_list]
- return js_list
- def get_jsd_list():
- base_path = os.path.dirname(os.path.abspath(os.path.dirname(__file__))) # 上级目录
- js_path = os.path.join(base_path, 'js')
- if not os.path.exists(js_path):
- os.makedirs(js_path, exist_ok=True)
- logger.info(f'初始化{js_path}目录')
- jsds = os.listdir(js_path)
- jsds = list(filter(lambda x: str(x).endswith('.jsd'), jsds))
- return jsds
- def custom_merge(original:dict,custom:dict):
- """
- 合并用户配置
- :param original: 原始配置
- :param custom: 自定义配置
- :return:
- """
- if not custom or len(custom.keys()) < 1:
- return original
- new_keys = custom.keys()
- updateObj = {}
- extend_obj = {}
- for key in ['wallpaper','spider','homepage','lives','hotSearch','sniffer','recommend','rating','rules']:
- if key in new_keys:
- updateObj[key] = custom[key]
- for key in ['drives','sites','flags','ads','parses']:
- if key in new_keys:
- extend_obj[key] = custom[key]
- original.update(updateObj)
- for key in extend_obj.keys():
- # original[key].extend(extend_obj[key])
- # print(key,original.get(key))
- if original.get(key) and isinstance(original[key],list):
- original[key].extend(extend_obj[key])
- else:
- original[key] = extend_obj[key]
- logger.info(f'合并配置共有解析数量:{len(original.get("parses"))}')
- return original
- def getCustonDict(host,ali_token='',js0_password=''):
- customFile = 'base/custom.conf'
- if not os.path.exists(customFile):
- with open(customFile, 'w+', encoding='utf-8') as f:
- f.write('{}')
- customConfig = False
- try:
- with open(customFile,'r',encoding='utf-8') as f:
- text = f.read()
- customConfig = parseText(render_template_string(text,host=host,ali_token=ali_token,js0_password=js0_password))
- print(customConfig)
- except Exception as e:
- logger.info(f'用户自定义配置加载失败:{e}')
- return customConfig
- def get_multi_rules(rules):
- lsg = storage_service()
- multi_mode = lsg.getItem('MULTI_MODE',0)
- fix_multi = ['drpy']
- if not multi_mode or str(multi_mode)=='0':
- rules['list'] = list(filter(lambda x: x['name'] in fix_multi or x.get('multi'), rules['list']))
- rules['count'] = len(rules['list'])
- # print(rules)
- return rules
|