123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- """
- Главный файл для обрабоки ввода/вывода через движки.
- Author: Milinuri Nirvalen
- """
- from .files import Config
- from .files import fcheck
- from .modules import Context
- from .ui import Logger
- import asyncio
- import shlex
- import time
- class Chio:
- """Главный класс бота."""
- def __init__(self, backend=None):
- # главные атрибуты Чио
- self.ver = "2.7.3.2 (70)"
- self.l = Logger('Core')
- self.c = Config('core', model={'name':'Rin', 'ver':self.ver,
- 'names':['/', 'Rin', 'Chio', 'Рин', 'Чио'],
- 'ignore_files':[], 'ignore_pid':[], 'ignore_path':[],
- 'secure_call':True})
- self.__dict__.update(self.c.group_data)
- if backend:
- self.backend = backend(self)
- else:
- self.backend = None
- self.plugins = []
- self.commands = 0
- self.start_time = time.time()
- # атрибуты обработчиков
- self.routers = {}
- self.handlers = {'init':[], 'before':[], 'after':[], 'except':[]}
- self.sp = {}
- self.triggers = {}
- self.check_funcs = {}
- def start(self, asy=True, debug=False):
- '''метод запуска бота
- :optional param async: использовать ли async/await
- :optional param debug: вызов startEvent без запуска движка'''
- if debug:
- self.l.info('запуск Чио: РЕЖИМ ОТЛАДКИ')
- asyncio.run(self.startEvent(debug=True))
- try:
- if asy:
- self.l.info('запуск Чио')
- asyncio.run(self.backend.start())
- else:
- self.l.log('запуск чио: СИНХРОННЫЙ РЕЖИМ')
- self.backend.start()
- except KeyboardInterrupt:
- asyncio.run(self.stopEvent())
- # events: Поведение Чио в определённых ситуациях
- # ===============================================
- async def startEvent(self, debug=False):
- '''метод щапуска Чио
- :optional param debug: выключать ли Чио после успешного запуска'''
- # проверки дирректорий
- self.l.info('проверка директорий Чио')
- for x in ['backends', 'data', 'plugins']:
- fcheck(x, True)
- # инициализирующая прозвонка.
- self.l.info('запуск начальной прозвонки')
- await self.initCall()
- if debug:
- self.l.log('проверка запуска завершена.', 'StartEvent')
- await self.stopEvent()
- else:
- self.l.log('Утречка, семпай.', 'StartEvent')
- # статус безопасного режима
- if self.secure_call:
- status = f'{self.l.lgreen}ВКЛЮЧЁН'
- else:
- status = f'{self.l.lyellow}ОТКЛЮЧЁН'
- self.l.log(f'Безопасный режим: {status}.', 'config: secure_call')
- async def stopEvent(self):
- # поведение Чио при остановке
- self.l.log('Спокойной ночи, семпай.', 'StopEvent')
- exit()
- async def messageEvent(self, event, only_cmd=False):
- '''поведение Чио при получении сообщения
- :param event: экземпляр события
- :optional param only_cmd: запустить только обработчик команд'''
- # пропускаем пустые события
- if not event:
- return True
- prefix = None # префикс команд
- cmd_prefix = None # дополнительные префикс плагина
- handler = {} # функция, которую необходимо выполнить
- cmd = '' # название обработчика, отвечающего за функцию
- args = [] # список аргументов после команды
- sargs = '' # часть текста после команды
- utime = time.time()
- event = self.backend.setLevel(event,
- Config(self.backend.name).group_data)
- # перебор преффиксов
- for p in self.names:
- if event.get('text', '').lower().startswith(p.lower()):
- # удаляем префикс и пробелы после него
- event.set('text', event.get('text')[len(p):].strip(', '))
- prefix = p
- text = event.get('text')
- break
- # если нету префикса или текста - вызываем before и after обработчики
- if not prefix or not text:
- return await self.eventCall(event)
- # перебор преффиков плагинов
- for sp in self.sp:
- if text.lower().split()[0] == sp:
- cmd_prefix = sp
- text = text[len(sp):].strip()
- break
- # формирования словаря обработчиков
- router = {}
- if cmd_prefix:
- target = self.sp[cmd_prefix]
- else:
- target = self.routers
- # если есть уровень, в ином случае не будет ни одной комманды
- if event.get('level'):
- for k, v in target.items():
- # если команда для администраторов бота
- if v["admins"] and event.get("level") == 10:
- for x in v["cmds"]:
- router[x] = v
- # команды для обычных пользователей
- elif not v["admins"]:
- for x in v["cmds"]:
- router[x] = v
- # заменяем все пропуски строки на пробелы + пропущенные строки
- # разделяем сообщение по пробелам
- words = text.replace('\n', ' \n').split(' ')
- # получаем команду из всего текста
- while words:
- # если слово есть в словаре обработчиков
- if words[0].lower().replace('-', '') in router:
- # задаём переменные и останавливаем цыкл
- handler = router[words[0].lower().replace('-', '')]
- cmd = words[0].lower().replace('-', '')
- words.pop(0)
- break
- # если нет --> удаляем это слово
- else:
- words.pop(0)
- # если есть команда и есть words после цыкла --> получаем аргументы
- if handler and words:
- # shlex считаем аргумент в кавычках как 1 "1 2 3"
- try:
- args = shlex.split(' '.join(words).strip())
- except Exception as e:
- self.l.log(e, 'shlex <e>')
- args = ' '.join(words).strip()
- sargs = ' '.join(words).strip('\n')
- # вызываем прозвонку
- return await self.eventCall(
- event, args, prefix, sargs, handler, cmd, only_cmd, utime=utime)
- # ###########################
- # handlers:
- # добавление новых обработчиков.
- # ###########################
- def add_handler(self, name, func, admins=False):
- '''добаляет команду и её функцию в саисок
- :param name: имя команды
- :param func: выполняемая командой функция
- :optional param admins: является ли команда для администраторов'''
- self.router[name] = {'func':func, 'level':admins}
- def add_event_handler(self, func, types='init'):
- '''добавить обработчик события
- :param func: выполняемая функция обработчика
- :param types: список типов событий,
- для которых необходимо запускать обработчик'''
- if types in self.handlers:
- self.handlers[types].append(func)
- else:
- self.l.log(f'неизвестный тип события: {types}', '<w> addHandler')
- # ###########################
- # call func:
- # функции вызова обработчиков.
- # ###########################
- async def call(self, func, *args):
- """вызов функции с определёнными аргументами
- :param func: вызываемая функция
- :*param args: передаваемые аргументы в функию"""
- if self.secure_call:
- try:
- return await func(*args)
- except Exception as e:
- self.l.log(e, 'e')
- await self.exceptionCall(e, *args)
- else:
- return await func(*args)
- async def commandCall(self, handler, event, ctx):
- """вызов команды
- :param handler: набор функций обработчика
- :param event: экземпляр события
- :param ctx: экземпляр контекста"""
- # если есть проверяющая функция
- res = True
- for k, v in handler["check_func"]:
- if k in self.check_funcs:
- res = await self.call(self.check_funcs[k], event, ctx, v)
- self.l.log(f'{k} ({v}) -> {res}', 'ChF')
- else:
- self.l.error(f'ChF "{k}" не найдена')
- res = False
- if not res:
- break
- if res:
- return await self.call(handler["func"], event, ctx)
- else:
- for k, v in handler["else_func"]:
- if k in self.check_funcs:
- await self.call(self.check_funcs[k], event, ctx, v)
- self.l.log(f'{k} ({v}) -> {res}', 'else F')
- else:
- self.l.error(f'else F "{k}" не найдена')
- async def initCall(self):
- # вызов init обработчиков.
- for x in self.handlers["init"]:
- await self.call(x, self)
- async def beforeCommandsCall(self, event, ctx):
- """вызов before обработчиков
- :param event: экземпляр события
- :param ctx: экземпляр контекста"""
- resp = []
- for x in self.handlers["before"]:
- r = await self.call(x, event, ctx)
- if r:
- resp.append(r)
- return resp
- async def afterCommandsCall(self, event, ctx):
- """вызов after обработчиков
- :param event: экземпляр события
- :param ctx: экземпляр контекста"""
- resp = []
- for x in self.handlers["after"]:
- r = await self.call(x, event, ctx)
- if r:
- resp.append(r)
- return resp
- async def exceptionCall(self, e, event, ctx):
- """вызов except обработчиков при возниконовении исключении
- :param e: экхемпляр Exception
- :param event: экземпляр события
- :param ctx: экземпляр контекста"""
- for x in self.handlers["except"]:
- return await self.call(x, e, event, ctx)
- async def eventCall(self, event, args=[], prefix='', sargs='', handler=None,
- cmd_name='', only_cmd=False, router={}, utime=0):
- ''' событие вызова прозвонки
- :param event: экземпляр события
- :optional param args: список аргументов команды
- :optional param prefix: префикс команды
- :optional param sargs: текст после команды
- :optional param handler : функции обработчика
- :optional param cmd_name : имя комсанды
- :optional param only_cmd : вызывать ли команду без обработчиков'''
- ctx = Context(self, self.backend, event)
- ctx.cmd = cmd_name
- ctx.args = args
- ctx.prefix = prefix
- ctx.sargs = sargs
- ctx.handler = handler
- ctx.utime = utime
- resp = []
- if not only_cmd:
- resp.append(await self.beforeCommandsCall(event, ctx))
- if handler:
- resp.append(await self.commandCall(handler, event, ctx))
- if not only_cmd:
- resp.append(await self.afterCommandsCall(event, ctx))
- return resp
|