handler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. from collections import deque
  5. from contextlib import suppress
  6. from types import TracebackType
  7. from typing import TYPE_CHECKING, Any, Callable, ContextManager, Deque, Dict, NamedTuple, Optional, Sequence, Type, Union, cast
  8. from kitty.constants import kitten_exe, running_in_kitty
  9. from kitty.fast_data_types import monotonic, safe_pipe
  10. from kitty.types import DecoratedFunc, ParsedShortcut
  11. from kitty.typing import (
  12. AbstractEventLoop,
  13. BossType,
  14. Debug,
  15. ImageManagerType,
  16. KeyActionType,
  17. KeyEventType,
  18. LoopType,
  19. MouseButton,
  20. MouseEvent,
  21. ScreenSize,
  22. TermManagerType,
  23. WindowType,
  24. )
  25. from .operations import MouseTracking, pending_update
  26. if TYPE_CHECKING:
  27. from kitty.file_transmission import FileTransmissionCommand
  28. OpenUrlHandler = Optional[Callable[[BossType, WindowType, str, int, str], bool]]
  29. class ButtonEvent(NamedTuple):
  30. mouse_event: MouseEvent
  31. timestamp: float
  32. def is_click(a: ButtonEvent, b: ButtonEvent) -> bool:
  33. from .loop import EventType
  34. if a.mouse_event.type is not EventType.PRESS or b.mouse_event.type is not EventType.RELEASE:
  35. return False
  36. x = a.mouse_event.cell_x - b.mouse_event.cell_x
  37. y = a.mouse_event.cell_y - b.mouse_event.cell_y
  38. return x*x + y*y <= 4
  39. class KittenUI:
  40. allow_remote_control: bool = False
  41. remote_control_password: Union[bool, str] = False
  42. def __init__(self, func: Callable[[list[str]], str], allow_remote_control: bool, remote_control_password: Union[bool, str]):
  43. self.func = func
  44. self.allow_remote_control = allow_remote_control
  45. self.remote_control_password = remote_control_password
  46. self.password = self.to = ''
  47. self.rc_fd = -1
  48. self.initialized = False
  49. def initialize(self) -> None:
  50. if self.initialized:
  51. return
  52. self.initialized = True
  53. if running_in_kitty():
  54. return
  55. if self.allow_remote_control:
  56. self.to = os.environ.get('KITTY_LISTEN_ON', '')
  57. self.rc_fd = int(self.to.partition(':')[-1])
  58. os.set_inheritable(self.rc_fd, False)
  59. if (self.remote_control_password or self.remote_control_password == '') and not self.password:
  60. import socket
  61. with socket.fromfd(self.rc_fd, socket.AF_UNIX, socket.SOCK_STREAM) as s:
  62. data = s.recv(256)
  63. if not data.endswith(b'\n'):
  64. raise Exception(f'The remote control password was invalid: {data!r}')
  65. self.password = data.strip().decode()
  66. def __call__(self, args: list[str]) -> str:
  67. self.initialize()
  68. return self.func(args)
  69. def allow_indiscriminate_remote_control(self, enable: bool = True) -> None:
  70. if self.rc_fd > -1:
  71. if enable:
  72. os.set_inheritable(self.rc_fd, True)
  73. if self.password:
  74. os.environ['KITTY_RC_PASSWORD'] = self.password
  75. else:
  76. os.set_inheritable(self.rc_fd, False)
  77. if self.password:
  78. os.environ.pop('KITTY_RC_PASSWORD', None)
  79. def remote_control(self, cmd: Union[str, Sequence[str]], **kw: Any) -> Any:
  80. if not self.allow_remote_control:
  81. raise ValueError('Remote control is not enabled, remember to use allow_remote_control=True')
  82. prefix = [kitten_exe(), '@']
  83. r = -1
  84. pass_fds = list(kw.get('pass_fds') or ())
  85. try:
  86. if self.rc_fd > -1:
  87. pass_fds.append(self.rc_fd)
  88. if self.password and self.rc_fd > -1:
  89. r, w = safe_pipe(False)
  90. os.write(w, self.password.encode())
  91. os.close(w)
  92. prefix += ['--password-file', f'fd:{r}', '--use-password', 'always']
  93. pass_fds.append(r)
  94. if pass_fds:
  95. kw['pass_fds'] = tuple(pass_fds)
  96. if isinstance(cmd, str):
  97. cmd = ' '.join(prefix)
  98. else:
  99. cmd = prefix + list(cmd)
  100. import subprocess
  101. if self.rc_fd > -1:
  102. is_inheritable = os.get_inheritable(self.rc_fd)
  103. if not is_inheritable:
  104. os.set_inheritable(self.rc_fd, True)
  105. try:
  106. return subprocess.run(cmd, **kw)
  107. finally:
  108. if self.rc_fd > -1 and not is_inheritable:
  109. os.set_inheritable(self.rc_fd, False)
  110. finally:
  111. if r > -1:
  112. os.close(r)
  113. def kitten_ui(
  114. allow_remote_control: bool = KittenUI.allow_remote_control,
  115. remote_control_password: Union[bool, str] = KittenUI.allow_remote_control,
  116. ) -> Callable[[Callable[[list[str]], str]], KittenUI]:
  117. def wrapper(impl: Callable[..., Any]) -> KittenUI:
  118. return KittenUI(impl, allow_remote_control, remote_control_password)
  119. return wrapper
  120. class Handler:
  121. image_manager_class: Optional[Type[ImageManagerType]] = None
  122. use_alternate_screen = True
  123. mouse_tracking = MouseTracking.none
  124. terminal_io_ended = False
  125. overlay_ready_report_needed = False
  126. def _initialize(
  127. self,
  128. screen_size: ScreenSize,
  129. term_manager: TermManagerType,
  130. schedule_write: Callable[[bytes], None],
  131. tui_loop: LoopType,
  132. debug: Debug,
  133. image_manager: Optional[ImageManagerType] = None
  134. ) -> None:
  135. from .operations import commander
  136. self.screen_size = screen_size
  137. self._term_manager = term_manager
  138. self._tui_loop = tui_loop
  139. self._schedule_write = schedule_write
  140. self.debug = debug
  141. self.cmd = commander(self)
  142. self._image_manager = image_manager
  143. self._button_events: Dict[MouseButton, Deque[ButtonEvent]] = {}
  144. @property
  145. def image_manager(self) -> ImageManagerType:
  146. assert self._image_manager is not None
  147. return self._image_manager
  148. @property
  149. def asyncio_loop(self) -> AbstractEventLoop:
  150. return self._tui_loop.asyncio_loop
  151. def add_shortcut(self, action: KeyActionType, spec: Union[str, ParsedShortcut]) -> None:
  152. if not hasattr(self, '_key_shortcuts'):
  153. self._key_shortcuts: Dict[ParsedShortcut, KeyActionType] = {}
  154. if isinstance(spec, str):
  155. from kitty.key_encoding import parse_shortcut
  156. spec = parse_shortcut(spec)
  157. self._key_shortcuts[spec] = action
  158. def shortcut_action(self, key_event: KeyEventType) -> Optional[KeyActionType]:
  159. for sc, action in self._key_shortcuts.items():
  160. if key_event.matches(sc):
  161. return action
  162. return None
  163. def __enter__(self) -> None:
  164. if self._image_manager is not None:
  165. self._image_manager.__enter__()
  166. self.debug.fobj = self
  167. self.initialize()
  168. def __exit__(self, etype: type, value: Exception, tb: TracebackType) -> None:
  169. del self.debug.fobj
  170. with suppress(Exception):
  171. self.finalize()
  172. if self._image_manager is not None:
  173. self._image_manager.__exit__(etype, value, tb)
  174. def initialize(self) -> None:
  175. pass
  176. def finalize(self) -> None:
  177. pass
  178. def on_resize(self, screen_size: ScreenSize) -> None:
  179. self.screen_size = screen_size
  180. def quit_loop(self, return_code: Optional[int] = None) -> None:
  181. self._tui_loop.quit(return_code)
  182. def on_term(self) -> None:
  183. self._tui_loop.quit(1)
  184. def on_hup(self) -> None:
  185. self.terminal_io_ended = True
  186. self._tui_loop.quit(1)
  187. def on_key_event(self, key_event: KeyEventType, in_bracketed_paste: bool = False) -> None:
  188. ' Override this method and perform_default_key_action() to handle all key events '
  189. if key_event.text:
  190. self.on_text(key_event.text, in_bracketed_paste)
  191. else:
  192. self.on_key(key_event)
  193. def perform_default_key_action(self, key_event: KeyEventType) -> bool:
  194. ' Override in sub-class if you want to handle these key events yourself '
  195. if key_event.matches('ctrl+c'):
  196. self.on_interrupt()
  197. return True
  198. if key_event.matches('ctrl+d'):
  199. self.on_eot()
  200. return True
  201. return False
  202. def on_text(self, text: str, in_bracketed_paste: bool = False) -> None:
  203. pass
  204. def on_key(self, key_event: KeyEventType) -> None:
  205. pass
  206. def on_mouse_event(self, mouse_event: MouseEvent) -> None:
  207. from .loop import EventType
  208. if mouse_event.type is EventType.MOVE:
  209. self.on_mouse_move(mouse_event)
  210. elif mouse_event.type is EventType.PRESS:
  211. q = self._button_events.setdefault(mouse_event.buttons, deque())
  212. q.append(ButtonEvent(mouse_event, monotonic()))
  213. if len(q) > 5:
  214. q.popleft()
  215. elif mouse_event.type is EventType.RELEASE:
  216. q = self._button_events.setdefault(mouse_event.buttons, deque())
  217. q.append(ButtonEvent(mouse_event, monotonic()))
  218. if len(q) > 5:
  219. q.popleft()
  220. if len(q) > 1 and is_click(q[-2], q[-1]):
  221. self.on_click(mouse_event)
  222. def on_mouse_move(self, mouse_event: MouseEvent) -> None:
  223. pass
  224. def on_click(self, mouse_event: MouseEvent) -> None:
  225. pass
  226. def on_interrupt(self) -> None:
  227. pass
  228. def on_eot(self) -> None:
  229. pass
  230. def on_writing_finished(self) -> None:
  231. pass
  232. def on_kitty_cmd_response(self, response: Dict[str, Any]) -> None:
  233. pass
  234. def on_clipboard_response(self, text: str, from_primary: bool = False) -> None:
  235. pass
  236. def on_file_transfer_response(self, ftc: 'FileTransmissionCommand') -> None:
  237. pass
  238. def on_capability_response(self, name: str, val: str) -> None:
  239. pass
  240. def write(self, data: Union[bytes, str]) -> None:
  241. if isinstance(data, str):
  242. data = data.encode('utf-8')
  243. self._schedule_write(data)
  244. def flush(self) -> None:
  245. pass
  246. def print(self, *args: object, sep: str = ' ', end: str = '\r\n') -> None:
  247. data = sep.join(map(str, args)) + end
  248. self.write(data)
  249. def suspend(self) -> ContextManager[TermManagerType]:
  250. return self._term_manager.suspend()
  251. @classmethod
  252. def atomic_update(cls, func: DecoratedFunc) -> DecoratedFunc:
  253. from functools import wraps
  254. @wraps(func)
  255. def f(*a: Any, **kw: Any) -> Any:
  256. with pending_update(a[0].write):
  257. return func(*a, **kw)
  258. return cast(DecoratedFunc, f)
  259. class HandleResult:
  260. type_of_input: Optional[str] = None
  261. no_ui: bool = False
  262. def __init__(self, impl: Callable[..., Any], type_of_input: Optional[str], no_ui: bool, has_ready_notification: bool, open_url_handler: OpenUrlHandler):
  263. self.impl = impl
  264. self.no_ui = no_ui
  265. self.type_of_input = type_of_input
  266. self.has_ready_notification = has_ready_notification
  267. self.open_url_handler = open_url_handler
  268. def __call__(self, args: Sequence[str], data: Any, target_window_id: int, boss: BossType) -> Any:
  269. return self.impl(args, data, target_window_id, boss)
  270. def result_handler(
  271. type_of_input: Optional[str] = None,
  272. no_ui: bool = False,
  273. has_ready_notification: bool = Handler.overlay_ready_report_needed,
  274. open_url_handler: OpenUrlHandler = None,
  275. ) -> Callable[[Callable[..., Any]], HandleResult]:
  276. def wrapper(impl: Callable[..., Any]) -> HandleResult:
  277. return HandleResult(impl, type_of_input, no_ui, has_ready_notification, open_url_handler)
  278. return wrapper