123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- """
- Описания класса плагина для расширения функционала Чио.
- Author: Milinuri Nirvalen
- """
- from .ui import Logger
- from .modules import Context
- import asyncio
- import importlib.util
- import os
- class Plugin:
- '''класс плагина
- :param **kwargs: доплнительные параметры плагина
- str: name - имя плагина
- bool: admins - является ли плагин для администраторов
- bool: disabled - отключён ли плагин
- list: backends - список поддерживаемых движков'''
- def __init__(self, name, desc='no desc', path='data', **kwargs):
- # добавляем параметры к атрибутам
- self.__dict__.update(kwargs)
- self.l = Logger()
- self.name = name
- self.desc = desc
- self.dpath = path
- if not os.path.exists(path):
- os.mkdir(path)
- self.l.complite(f'created folder: {path}')
- # локальное хранилище
- self.handlers = {'init':[], "before":[], "after":[], 'except':[]}
- self.routers = {}
- self.triggers = []
- self.commands = 0
- self.check_funcs = {}
- def eventHandler(self, *event_types):
- ''' декоратор обработчика событий
- :param *event_types: типы событий'''
- def wrapper(func):
- for t in event_types:
- if t in self.handlers:
- self.handlers[t].append(func)
- else:
- self.log(f'неизвестный тип события "{t}"', 'w')
- return func
- return wrapper
- def check_func(self, name=False):
- def wrapper(func):
- if func.__name__ in self.check_funcs:
- self.l.warn(f'ChF "{func.__name__}" будет перезаписана')
- self.check_funcs[func.__name__] = func
- return func
- return wrapper
- def addCheckFunc(self, func, name=False):
- if not name:
- name = func.__name__
- if name in self.check_funcs:
- self.l.warn(f'ChF "{name}" будет перезаписана')
- self.check_funcs[name] = func
- def cf(self, args):
- res = []
- if isinstance(args, str):
- res.append((args, None))
- elif isinstance(args, dict):
- for k, v in args.items():
- res.append((k, v))
- elif isinstance(args, list):
- for x in args:
- if isinstance(x, str):
- res.append((x, None))
- elif isinstance(x, dict):
- for k, v in x.items():
- res.append((k, v))
- elif isinstance(x, list):
- for a in self.cf(*x):
- res.append(a)
- return res
- def convert_names(self, names, use_origin=True):
- res = []
- for x in map(lambda x: x.strip().lower(), names):
- if x and x[-1] == ')' and x.count('(') == 1:
- origin = x.split('(')[0].replace('-', '')
- if use_origin:
- res.append(origin)
- for m in x.split('(')[1][:-1].split('/'):
- if m.count('-') == 1:
- res.append(m.strip().replace('-', origin))
- else:
- res.append(origin+m)
- else:
- res.append(x.replace('-', ''))
- return res
- def command(self, *args, **params):
- '''декоратор команды
- :param *args: перечень названий обработчиков
- :param **params: дополнительные параметры команды'''
- self.commands += 1
- commands = []
- for cmd in self.convert_names(list(args), False):
- if not cmd:
- continue
- if cmd and cmd[0] == '<' and cmd[-1] == '>':
- cmds = cmd[1:-1].split(' ')
- for r in self.gen_names(
- self.convert_names(cmds[0].split('/')),
- self.convert_names(cmds[1].split('/'))):
- commands.append(r)
- else:
- commands.append(cmd)
- # устанавливаем уровень полнимочий (локальный, глобальный)
- if params.get('admins'):
- admins = True
- elif params.get('users'):
- admins = False
- else:
- admins = self.__dict__.get('admins')
- def wrapper(func):
- names = list(args)
- if func.__name__.lower() not in commands:
- commands.append(func.__name__.lower())
- names.append(func.__name__.lower())
- for c in commands:
- self.triggers.append(c)
- self.routers[func.__name__] = {'func':func, 'admins':admins,
- "check_func":self.cf(params.get('check_func', [])),
- "else_func":self.cf(params.get('else_func', [])),
- "usage":params.get('usage'),
- "names":names, 'cmds':commands,
- 'id':self.__dict__.get('name', '')+' / '+func.__name__}
- return func
- return wrapper
- def gen_names(self, a, b):
- """принимает пару слов/списков и генерирует все возможные варианты
- :param a: первая список/строка
- :param b: вторая список/строка
- :return: список сгенерированных имён"""
- res = []
- if isinstance(a, str):
- a = [a]
- if isinstance(b, str):
- b = [b]
- for ax in a:
- for bx in b:
- res.append(f'{ax}{bx}')
- res.append(f'{bx}{ax}')
- return res
- def load(self, path):
- '''метод загрузки плагина
- пример:
- m = p.load("main.py") == import main
- :param path: путь до импортируемово файла относительно папки плагина
- :return: импортированный модуль'''
- filepath = str(self.path)+ '/' + path + '.py'
- spec = importlib.util.spec_from_file_location(filepath, os.path.abspath(filepath))
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- return module
- def log(self, text, log_type='i'):
- self.l.log(text, log_type)
|