123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- #!/usr/bin/env python
- # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
- import os
- from collections import deque
- from contextlib import suppress
- from types import TracebackType
- from typing import TYPE_CHECKING, Any, Callable, ContextManager, Deque, Dict, NamedTuple, Optional, Sequence, Type, Union, cast
- from kitty.constants import kitten_exe, running_in_kitty
- from kitty.fast_data_types import monotonic, safe_pipe
- from kitty.types import DecoratedFunc, ParsedShortcut
- from kitty.typing import (
- AbstractEventLoop,
- BossType,
- Debug,
- ImageManagerType,
- KeyActionType,
- KeyEventType,
- LoopType,
- MouseButton,
- MouseEvent,
- ScreenSize,
- TermManagerType,
- WindowType,
- )
- from .operations import MouseTracking, pending_update
- if TYPE_CHECKING:
- from kitty.file_transmission import FileTransmissionCommand
- OpenUrlHandler = Optional[Callable[[BossType, WindowType, str, int, str], bool]]
- class ButtonEvent(NamedTuple):
- mouse_event: MouseEvent
- timestamp: float
- def is_click(a: ButtonEvent, b: ButtonEvent) -> bool:
- from .loop import EventType
- if a.mouse_event.type is not EventType.PRESS or b.mouse_event.type is not EventType.RELEASE:
- return False
- x = a.mouse_event.cell_x - b.mouse_event.cell_x
- y = a.mouse_event.cell_y - b.mouse_event.cell_y
- return x*x + y*y <= 4
- class KittenUI:
- allow_remote_control: bool = False
- remote_control_password: Union[bool, str] = False
- def __init__(self, func: Callable[[list[str]], str], allow_remote_control: bool, remote_control_password: Union[bool, str]):
- self.func = func
- self.allow_remote_control = allow_remote_control
- self.remote_control_password = remote_control_password
- self.password = self.to = ''
- self.rc_fd = -1
- self.initialized = False
- def initialize(self) -> None:
- if self.initialized:
- return
- self.initialized = True
- if running_in_kitty():
- return
- if self.allow_remote_control:
- self.to = os.environ.get('KITTY_LISTEN_ON', '')
- self.rc_fd = int(self.to.partition(':')[-1])
- os.set_inheritable(self.rc_fd, False)
- if (self.remote_control_password or self.remote_control_password == '') and not self.password:
- import socket
- with socket.fromfd(self.rc_fd, socket.AF_UNIX, socket.SOCK_STREAM) as s:
- data = s.recv(256)
- if not data.endswith(b'\n'):
- raise Exception(f'The remote control password was invalid: {data!r}')
- self.password = data.strip().decode()
- def __call__(self, args: list[str]) -> str:
- self.initialize()
- return self.func(args)
- def allow_indiscriminate_remote_control(self, enable: bool = True) -> None:
- if self.rc_fd > -1:
- if enable:
- os.set_inheritable(self.rc_fd, True)
- if self.password:
- os.environ['KITTY_RC_PASSWORD'] = self.password
- else:
- os.set_inheritable(self.rc_fd, False)
- if self.password:
- os.environ.pop('KITTY_RC_PASSWORD', None)
- def remote_control(self, cmd: Union[str, Sequence[str]], **kw: Any) -> Any:
- if not self.allow_remote_control:
- raise ValueError('Remote control is not enabled, remember to use allow_remote_control=True')
- prefix = [kitten_exe(), '@']
- r = -1
- pass_fds = list(kw.get('pass_fds') or ())
- try:
- if self.rc_fd > -1:
- pass_fds.append(self.rc_fd)
- if self.password and self.rc_fd > -1:
- r, w = safe_pipe(False)
- os.write(w, self.password.encode())
- os.close(w)
- prefix += ['--password-file', f'fd:{r}', '--use-password', 'always']
- pass_fds.append(r)
- if pass_fds:
- kw['pass_fds'] = tuple(pass_fds)
- if isinstance(cmd, str):
- cmd = ' '.join(prefix)
- else:
- cmd = prefix + list(cmd)
- import subprocess
- if self.rc_fd > -1:
- is_inheritable = os.get_inheritable(self.rc_fd)
- if not is_inheritable:
- os.set_inheritable(self.rc_fd, True)
- try:
- return subprocess.run(cmd, **kw)
- finally:
- if self.rc_fd > -1 and not is_inheritable:
- os.set_inheritable(self.rc_fd, False)
- finally:
- if r > -1:
- os.close(r)
- def kitten_ui(
- allow_remote_control: bool = KittenUI.allow_remote_control,
- remote_control_password: Union[bool, str] = KittenUI.allow_remote_control,
- ) -> Callable[[Callable[[list[str]], str]], KittenUI]:
- def wrapper(impl: Callable[..., Any]) -> KittenUI:
- return KittenUI(impl, allow_remote_control, remote_control_password)
- return wrapper
- class Handler:
- image_manager_class: Optional[Type[ImageManagerType]] = None
- use_alternate_screen = True
- mouse_tracking = MouseTracking.none
- terminal_io_ended = False
- overlay_ready_report_needed = False
- def _initialize(
- self,
- screen_size: ScreenSize,
- term_manager: TermManagerType,
- schedule_write: Callable[[bytes], None],
- tui_loop: LoopType,
- debug: Debug,
- image_manager: Optional[ImageManagerType] = None
- ) -> None:
- from .operations import commander
- self.screen_size = screen_size
- self._term_manager = term_manager
- self._tui_loop = tui_loop
- self._schedule_write = schedule_write
- self.debug = debug
- self.cmd = commander(self)
- self._image_manager = image_manager
- self._button_events: Dict[MouseButton, Deque[ButtonEvent]] = {}
- @property
- def image_manager(self) -> ImageManagerType:
- assert self._image_manager is not None
- return self._image_manager
- @property
- def asyncio_loop(self) -> AbstractEventLoop:
- return self._tui_loop.asyncio_loop
- def add_shortcut(self, action: KeyActionType, spec: Union[str, ParsedShortcut]) -> None:
- if not hasattr(self, '_key_shortcuts'):
- self._key_shortcuts: Dict[ParsedShortcut, KeyActionType] = {}
- if isinstance(spec, str):
- from kitty.key_encoding import parse_shortcut
- spec = parse_shortcut(spec)
- self._key_shortcuts[spec] = action
- def shortcut_action(self, key_event: KeyEventType) -> Optional[KeyActionType]:
- for sc, action in self._key_shortcuts.items():
- if key_event.matches(sc):
- return action
- return None
- def __enter__(self) -> None:
- if self._image_manager is not None:
- self._image_manager.__enter__()
- self.debug.fobj = self
- self.initialize()
- def __exit__(self, etype: type, value: Exception, tb: TracebackType) -> None:
- del self.debug.fobj
- with suppress(Exception):
- self.finalize()
- if self._image_manager is not None:
- self._image_manager.__exit__(etype, value, tb)
- def initialize(self) -> None:
- pass
- def finalize(self) -> None:
- pass
- def on_resize(self, screen_size: ScreenSize) -> None:
- self.screen_size = screen_size
- def quit_loop(self, return_code: Optional[int] = None) -> None:
- self._tui_loop.quit(return_code)
- def on_term(self) -> None:
- self._tui_loop.quit(1)
- def on_hup(self) -> None:
- self.terminal_io_ended = True
- self._tui_loop.quit(1)
- def on_key_event(self, key_event: KeyEventType, in_bracketed_paste: bool = False) -> None:
- ' Override this method and perform_default_key_action() to handle all key events '
- if key_event.text:
- self.on_text(key_event.text, in_bracketed_paste)
- else:
- self.on_key(key_event)
- def perform_default_key_action(self, key_event: KeyEventType) -> bool:
- ' Override in sub-class if you want to handle these key events yourself '
- if key_event.matches('ctrl+c'):
- self.on_interrupt()
- return True
- if key_event.matches('ctrl+d'):
- self.on_eot()
- return True
- return False
- def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
- pass
- def on_key(self, key_event: KeyEventType) -> None:
- pass
- def on_mouse_event(self, mouse_event: MouseEvent) -> None:
- from .loop import EventType
- if mouse_event.type is EventType.MOVE:
- self.on_mouse_move(mouse_event)
- elif mouse_event.type is EventType.PRESS:
- q = self._button_events.setdefault(mouse_event.buttons, deque())
- q.append(ButtonEvent(mouse_event, monotonic()))
- if len(q) > 5:
- q.popleft()
- elif mouse_event.type is EventType.RELEASE:
- q = self._button_events.setdefault(mouse_event.buttons, deque())
- q.append(ButtonEvent(mouse_event, monotonic()))
- if len(q) > 5:
- q.popleft()
- if len(q) > 1 and is_click(q[-2], q[-1]):
- self.on_click(mouse_event)
- def on_mouse_move(self, mouse_event: MouseEvent) -> None:
- pass
- def on_click(self, mouse_event: MouseEvent) -> None:
- pass
- def on_interrupt(self) -> None:
- pass
- def on_eot(self) -> None:
- pass
- def on_writing_finished(self) -> None:
- pass
- def on_kitty_cmd_response(self, response: Dict[str, Any]) -> None:
- pass
- def on_clipboard_response(self, text: str, from_primary: bool = False) -> None:
- pass
- def on_file_transfer_response(self, ftc: 'FileTransmissionCommand') -> None:
- pass
- def on_capability_response(self, name: str, val: str) -> None:
- pass
- def write(self, data: Union[bytes, str]) -> None:
- if isinstance(data, str):
- data = data.encode('utf-8')
- self._schedule_write(data)
- def flush(self) -> None:
- pass
- def print(self, *args: object, sep: str = ' ', end: str = '\r\n') -> None:
- data = sep.join(map(str, args)) + end
- self.write(data)
- def suspend(self) -> ContextManager[TermManagerType]:
- return self._term_manager.suspend()
- @classmethod
- def atomic_update(cls, func: DecoratedFunc) -> DecoratedFunc:
- from functools import wraps
- @wraps(func)
- def f(*a: Any, **kw: Any) -> Any:
- with pending_update(a[0].write):
- return func(*a, **kw)
- return cast(DecoratedFunc, f)
- class HandleResult:
- type_of_input: Optional[str] = None
- no_ui: bool = False
- def __init__(self, impl: Callable[..., Any], type_of_input: Optional[str], no_ui: bool, has_ready_notification: bool, open_url_handler: OpenUrlHandler):
- self.impl = impl
- self.no_ui = no_ui
- self.type_of_input = type_of_input
- self.has_ready_notification = has_ready_notification
- self.open_url_handler = open_url_handler
- def __call__(self, args: Sequence[str], data: Any, target_window_id: int, boss: BossType) -> Any:
- return self.impl(args, data, target_window_id, boss)
- def result_handler(
- type_of_input: Optional[str] = None,
- no_ui: bool = False,
- has_ready_notification: bool = Handler.overlay_ready_report_needed,
- open_url_handler: OpenUrlHandler = None,
- ) -> Callable[[Callable[..., Any]], HandleResult]:
- def wrapper(impl: Callable[..., Any]) -> HandleResult:
- return HandleResult(impl, type_of_input, no_ui, has_ready_notification, open_url_handler)
- return wrapper
|