123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- """
- Функции для работы с файлами.
- класс для хранения файлов плагинов.
- Система импорта плагинов.
- Author: Milinuri Nirvalen
- """
- from .ui import Logger, Ui
- from .plugins import Plugin
- import toml
- import os
- import importlib.util
- import re
- class Config:
- """управление конфигурационными файлами *.toml
- :optional param name: имя (ключ словаря файла)
- :optional param model: вид конфигурационного файла по умолчанию
- :optional param filepath: путь до конфигурационного файла"""
- def __init__(self, name=None, model=None, filepath=None):
- super(Config, self).__init__()
- self.name = name
- self.model = model
- self.filepath = filepath or 'config.toml'
- self.file_data = {}
- self.group_data = {} or self.model
- self.data = self.group_data or self.model or self.file_data
- self.l = Logger(f'config: {self.name or self.filepath}')
- self.u = Ui(f'config: {self.name or self.filepath}')
- self.lock = False
- if not self.name and not self.filepath and self.model:
- self.lock = True
- self.l.log('заблокировано', 'w')
- if not self.lock:
- self.load()
- def save(self):
- """сохранение данных конфигурационного файла"""
- if self.name:
- self.file_data[self.name] = self.group_data
- elif self.model and not self.file_data:
- self.file_data = self.model
- with open(self.filepath, 'w') as f:
- toml.dump(self.file_data, f)
- self.l.log('сохранено!', 'c')
- def load(self):
- """загрузка данных из toml файла"""
- if os.path.exists(self.filepath):
- self.file_data = toml.load(self.filepath)
- if self.name:
- if self.name in self.file_data:
- self.group_data = self.file_data[self.name]
- else:
- self.l.log('группа данных не найдена', 'e')
- self.save()
- elif self.model:
- self.l.log('файл не найден', 'e')
- self.save()
- def print_config(self, data={}):
- # выводит содержимое конфигурационного файла
- if not data:
- data = self.file_data
- for k, v in self.file_data.items():
- print()
- self.u.print_list(v, k)
- def fcheck(path, create=False):
- '''проверяет существование папки
- :param path: путь до папки
- :param create: создать ли папку при её отцуцтвии'''
- l = Logger(f'check folder [{path}]')
- if os.path.exists(path):
- return True
- if create:
- os.mkdir(path)
- l.log(f'создание директории', 'c')
- else:
- l.log(f'дирректория не найдена', 'w')
- # ########################
- # import system
- # была скопирована из... забыл, честно
- # врочем от неё тут мало чего осталось родного
- # ########################
- def import_module(name, path):
- """Import module from specified path with specified name.
- :param name: module's name
- :param path: path to module's file
- :returns: imported module"""
- spec = importlib.util.spec_from_file_location(name, os.path.abspath(path))
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- return module
- def load_plugins_from_file(app, path):
- """Load plugins from specified path. Plugins can be in module-level
- variable "plugin" and in module-level variable "plugins" (with list of
- plugins).
- :param app: Chio instance
- :param path: path to file with module
- :returns: loaded module or None if no plugin found
- """
- if app.secure_call:
- try:
- mod = import_module(path, path)
- except Exception as e:
- app.l.log(e, 'e')
- return True
- else:
- mod = import_module(path, path)
- if mod:
- for pl in [getattr(mod, "p", None),
- getattr(mod, "plugin", None),
- *getattr(mod, "plugins", ())]:
- # пропускаем если pl не является экземляром плашига
- if not isinstance(pl, Plugin):
- continue
- name = path.split('/')[-1]
- pid = path.split('/')[-2]
- l = Logger(f'{pid}:{name.strip(".py")}')
- # пропускаем, если это указано в настройках
- if pid in app.ignore_pid:
- l.warn('disabled')
- continue
- # пропускаем если у плагина есть атрибут disabled
- if pl.__dict__.get('disabled'):
- l.warn('disabled')
- continue
- # получаем список движков
- target_backend = pl.__dict__.get('backend', '*')
- backend = app.backend.backend_type
- if target_backend == '*' or target_backend in backend:
- # задаём аргументы имени и пути к плагину
- sp = pl.__dict__.get("sp", [])
- pl.filename = name.split('.')[0]
- pl.path = path.split(name)[0]
- pl.pid = pid
- # добавляем плагин к общему списку плагинов
- # добавляем счётчик команд
- app.plugins.append(pl)
- app.commands += pl.commands
- # добавляем обрвботчику к приложению
- for k, v in pl.handlers.items():
- for h in v:
- app.add_event_handler(h, k)
- for k, v in pl.check_funcs.items():
- if k in app.check_funcs:
- l.warn(f'ChF {l.red}[{k}]{l.yellow} будет перезаписана')
- else:
- l.complite(f'ChF {l.cyan}[{k}]{l.green} зарегистрирована')
- app.check_funcs[k] = v
- if sp:
- for p in sp:
- if p in app.sp:
- l.log(f'доп. префикс: "{p}" будет дополнен')
- else:
- app.sp[p] = {}
- for k, v in pl.routers.items():
- if k == '_':
- l.warn(f'[{l.lred}{path}{l.yellow}]: не рекомендуется использовать "_" как имя функции')
- for x in v["cmds"]:
- n = p+'.'+x
- if n in app.triggers:
- l.warn(f'имя: "{n}" уже используется [{l.lred}{app.triggers[n]}{l.yellow}] и будет перезаписанo')
- app.triggers[n] = f"{path}: {v['id'].split('/')[1]}"
- app.sp[p][k] = v
- else:
- for k, v in pl.routers.items():
- if k == '_':
- l.warn(f'[{l.lred}{path}{l.yellow}]: не рекомендуется использовать "_" как имя функции')
- for n in v["cmds"]:
- if n in app.triggers:
- l.warn(f'имя: "{n}" уже используется [{l.lred}{app.triggers[n]}{l.yellow}] и будет перезаписано')
- app.triggers[n] = f"{path}:{v['id'].split('/')[1]} {v['cmds']}"
- app.routers[k] = v
- l.complite(f'loaded')
- else:
- l.log(f'несовместимый движок: {", ".join(backend)}')
- def count_files(app, folder, count=0):
- """рекурсивная функция счёта файлов
- :param app: экземляр чио
- :param folder: папка для поиска
- :optional param count: кол-во найденных файлов
- :return: кол-во найденных файлов"""
- for name in os.listdir(folder):
- path = os.path.join(folder, name)
- if os.path.isdir(path) and not name.startswith('_') and path not in app.ignore_path:
- count = count_files(app, path, count)
- elif re.match(r"^[^_].*\.py$", name) and name not in app.ignore_files:
- count += 1
- return count
- def load_plugins(app, folder='', main_folder=True, count=0, files=0):
- """Рекурсивно загружает все пакеты.
- :optional param folder: путь до требуемой папки
- :optional param main_foler: является ли папка точкой входа
- :optianal param count: кол-во загруженных плагинов
- :optional param files: кол-во модулей в папке
- :returns: number of loaded plugins"""
- u = Ui()
- if main_folder:
- folder = 'plugins/' + folder
- fcheck(folder, True)
- files = count_files(app, folder)
- app.l.log(f'загрузка {files} плагинов из: {folder}', "i")
- for name in os.listdir(folder):
- path = os.path.join(folder, name)
- if os.path.isdir(path) and not name.startswith('_') and path not in app.ignore_path:
- count = load_plugins(app, path, False, count, files)
- elif re.match(r"^[^_].*\.py$", name) and name not in app.ignore_files:
- count += 1
- load_plugins_from_file(app, path)
- if app.__dict__.get('progress_bar'):
- u.progress_bar(count, files, printEnd='\r')
- if count == files and main_folder:
- print()
- return count
|