loop.py 17 KB


  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import asyncio
  4. import codecs
  5. import io
  6. import os
  7. import re
  8. import selectors
  9. import signal
  10. import sys
  11. import termios
  12. from contextlib import contextmanager, suppress
  13. from enum import Enum, IntFlag, auto
  14. from functools import partial
  15. from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional
  16. from kitty.constants import is_macos
  17. from kitty.fast_data_types import FILE_TRANSFER_CODE, close_tty, normal_tty, open_tty, parse_input_from_terminal, raw_tty
  18. from kitty.key_encoding import ALT, CTRL, SHIFT, backspace_key, decode_key_event, enter_key
  19. from kitty.typing import ImageManagerType, KeyEventType, Protocol
  20. from kitty.utils import ScreenSize, ScreenSizeGetter, screen_size_function, write_all
  21. from .handler import Handler
  22. from .operations import MouseTracking, init_state, reset_state
  23. class BinaryWrite(Protocol):
  24. def write(self, data: bytes) -> None:
  25. pass
  26. def flush(self) -> None:
  27. pass
  28. def debug_write(*a: Any, **kw: Any) -> None:
  29. from base64 import standard_b64encode
  30. fobj = kw.pop('file', sys.stderr.buffer)
  31. buf = io.StringIO()
  32. kw['file'] = buf
  33. print(*a, **kw)
  34. stext = buf.getvalue()
  35. for i in range(0, len(stext), 256):
  36. chunk = stext[i:i + 256]
  37. text = b'\x1bP@kitty-print|' + standard_b64encode(chunk.encode('utf-8')) + b'\x1b\\'
  38. fobj.write(text)
  39. fobj.flush()
  40. class Debug:
  41. fobj: Optional[BinaryWrite] = None
  42. def __call__(self, *a: Any, **kw: Any) -> None:
  43. kw['file'] = self.fobj or sys.stdout.buffer
  44. debug_write(*a, **kw)
  45. debug = Debug()
  46. ftc_code = str(FILE_TRANSFER_CODE)
  47. class TermManager:
  48. def __init__(
  49. self, optional_actions: int = termios.TCSANOW, use_alternate_screen: bool = True,
  50. mouse_tracking: MouseTracking = MouseTracking.none
  51. ) -> None:
  52. self.extra_finalize: Optional[str] = None
  53. self.optional_actions = optional_actions
  54. self.use_alternate_screen = use_alternate_screen
  55. self.mouse_tracking = mouse_tracking
  56. def set_state_for_loop(self, set_raw: bool = True) -> None:
  57. if set_raw:
  58. raw_tty(self.tty_fd, self.original_termios)
  59. write_all(self.tty_fd, init_state(self.use_alternate_screen, self.mouse_tracking))
  60. def reset_state_to_original(self) -> None:
  61. normal_tty(self.tty_fd, self.original_termios)
  62. if self.extra_finalize:
  63. write_all(self.tty_fd, self.extra_finalize)
  64. write_all(self.tty_fd, reset_state(self.use_alternate_screen))
  65. @contextmanager
  66. def suspend(self) -> Generator['TermManager', None, None]:
  67. self.reset_state_to_original()
  68. yield self
  69. self.set_state_for_loop()
  70. def __enter__(self) -> 'TermManager':
  71. self.tty_fd, self.original_termios = open_tty(False, self.optional_actions)
  72. self.set_state_for_loop(set_raw=False)
  73. return self
  74. def __exit__(self, *a: object) -> None:
  75. with suppress(Exception):
  76. self.reset_state_to_original()
  77. close_tty(self.tty_fd, self.original_termios)
  78. del self.tty_fd, self.original_termios
  79. class MouseButton(IntFlag):
  80. NONE, LEFT, MIDDLE, RIGHT, FOURTH, FIFTH, SIXTH, SEVENTH = 0, 1, 2, 4, 8, 16, 32, 64
  81. WHEEL_UP, WHEEL_DOWN, WHEEL_LEFT, WHEEL_RIGHT = -1, -2, -4, -8
  82. bmap = MouseButton.LEFT, MouseButton.MIDDLE, MouseButton.RIGHT
  83. ebmap = MouseButton.FOURTH, MouseButton.FIFTH, MouseButton.SIXTH, MouseButton.SEVENTH
  84. wbmap = MouseButton.WHEEL_UP, MouseButton.WHEEL_DOWN, MouseButton.WHEEL_LEFT, MouseButton.WHEEL_RIGHT
  85. SHIFT_INDICATOR = 1 << 2
  86. ALT_INDICATOR = 1 << 3
  87. CTRL_INDICATOR = 1 << 4
  88. MOTION_INDICATOR = 1 << 5
  89. class EventType(Enum):
  90. PRESS = auto()
  91. RELEASE = auto()
  92. MOVE = auto()
  93. class MouseEvent(NamedTuple):
  94. cell_x: int
  95. cell_y: int
  96. pixel_x: int
  97. pixel_y: int
  98. type: EventType
  99. buttons: MouseButton
  100. mods: int
  101. def pixel_to_cell(px: int, length: int, cell_length: int) -> int:
  102. px = max(0, min(px, length - 1))
  103. return px // cell_length
  104. def decode_sgr_mouse(text: str, screen_size: ScreenSize) -> MouseEvent:
  105. cb_, x_, y_ = text.split(';')
  106. m, y_ = y_[-1], y_[:-1]
  107. cb, x, y = map(int, (cb_, x_, y_))
  108. typ = EventType.RELEASE if m == 'm' else (EventType.MOVE if cb & MOTION_INDICATOR else EventType.PRESS)
  109. buttons: MouseButton = MouseButton.NONE
  110. cb3 = cb & 3
  111. if cb >= 128:
  112. buttons |= ebmap[cb3]
  113. elif cb >= 64:
  114. buttons |= wbmap[cb3]
  115. elif cb3 < 3:
  116. buttons |= bmap[cb3]
  117. mods = 0
  118. if cb & SHIFT_INDICATOR:
  119. mods |= SHIFT
  120. if cb & ALT_INDICATOR:
  121. mods |= ALT
  122. if cb & CTRL_INDICATOR:
  123. mods |= CTRL
  124. return MouseEvent(
  125. pixel_to_cell(x, screen_size.width, screen_size.cell_width), pixel_to_cell(y, screen_size.height, screen_size.cell_height),
  126. x, y, typ, buttons, mods
  127. )
  128. class UnhandledException(Handler):
  129. def __init__(self, tb: str) -> None:
  130. self.tb = tb
  131. def initialize(self) -> None:
  132. self.cmd.clear_screen()
  133. self.cmd.set_scrolling_region()
  134. self.cmd.set_cursor_visible(True)
  135. self.cmd.set_default_colors()
  136. self.write(self.tb.replace('\n', '\r\n'))
  137. self.write('\r\n')
  138. self.write('Press Enter to quit')
  139. def on_key(self, key_event: KeyEventType) -> None:
  140. if key_event.key == 'ENTER':
  141. self.quit_loop(1)
  142. def on_interrupt(self) -> None:
  143. self.quit_loop(1)
  144. on_eot = on_term = on_interrupt
  145. class SignalManager:
  146. def __init__(
  147. self,
  148. loop: asyncio.AbstractEventLoop,
  149. on_winch: Callable[[], None],
  150. on_interrupt: Callable[[], None],
  151. on_term: Callable[[], None],
  152. on_hup: Callable[[], None],
  153. ) -> None:
  154. self.asyncio_loop = loop
  155. self.on_winch, self.on_interrupt, self.on_term = on_winch, on_interrupt, on_term
  156. self.on_hup = on_hup
  157. def __enter__(self) -> None:
  158. self.asyncio_loop.add_signal_handler(signal.SIGWINCH, self.on_winch)
  159. self.asyncio_loop.add_signal_handler(signal.SIGINT, self.on_interrupt)
  160. self.asyncio_loop.add_signal_handler(signal.SIGTERM, self.on_term)
  161. self.asyncio_loop.add_signal_handler(signal.SIGHUP, self.on_hup)
  162. def __exit__(self, *a: Any) -> None:
  163. tuple(map(self.asyncio_loop.remove_signal_handler, (
  164. signal.SIGWINCH, signal.SIGINT, signal.SIGTERM, signal.SIGHUP)))
  165. sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'
  166. class Loop:
  167. def __init__(
  168. self,
  169. sanitize_bracketed_paste: str = sanitize_bracketed_paste,
  170. optional_actions: int = termios.TCSADRAIN
  171. ):
  172. if is_macos:
  173. # On macOS PTY devices are not supported by the KqueueSelector and
  174. # the PollSelector is broken, causes 100% CPU usage
  175. self.asyncio_loop: asyncio.AbstractEventLoop = asyncio.SelectorEventLoop(selectors.SelectSelector())
  176. asyncio.set_event_loop(self.asyncio_loop)
  177. else:
  178. self.asyncio_loop = asyncio.get_event_loop()
  179. self.return_code = 0
  180. self.overlay_ready_reported = False
  181. self.optional_actions = optional_actions
  182. self.read_buf = ''
  183. self.decoder = codecs.getincrementaldecoder('utf-8')('ignore')
  184. try:
  185. self.iov_limit = max(os.sysconf('SC_IOV_MAX') - 1, 255)
  186. except Exception:
  187. self.iov_limit = 255
  188. self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc)
  189. self.ebs_pat = re.compile('([\177\r\x03\x04])')
  190. self.in_bracketed_paste = False
  191. self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
  192. if self.sanitize_bracketed_paste:
  193. self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste)
  194. def _read_ready(self, handler: Handler, fd: int) -> None:
  195. try:
  196. bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE)
  197. except BlockingIOError:
  198. return
  199. if not bdata:
  200. handler.terminal_io_ended = True
  201. self.quit(1)
  202. return
  203. data = self.decoder.decode(bdata)
  204. if self.read_buf:
  205. data = self.read_buf + data
  206. self.read_buf = data
  207. self.handler = handler
  208. try:
  209. self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste)
  210. except Exception:
  211. self.read_buf = ''
  212. raise
  213. finally:
  214. del self.handler
  215. # terminal input callbacks {{{
  216. def _on_text(self, text: str) -> None:
  217. if self.in_bracketed_paste and self.sanitize_bracketed_paste:
  218. text = self.sanitize_ibp_pat.sub('', text)
  219. for chunk in self.ebs_pat.split(text):
  220. if len(chunk) == 1:
  221. if chunk == '\r':
  222. self.handler.on_key(enter_key)
  223. elif chunk == '\177':
  224. self.handler.on_key(backspace_key)
  225. elif chunk == '\x03':
  226. self.handler.on_interrupt()
  227. elif chunk == '\x04':
  228. self.handler.on_eot()
  229. else:
  230. self.handler.on_text(chunk, self.in_bracketed_paste)
  231. elif chunk:
  232. self.handler.on_text(chunk, self.in_bracketed_paste)
  233. def _on_dcs(self, dcs: str) -> None:
  234. if dcs.startswith('@kitty-cmd'):
  235. import json
  236. self.handler.on_kitty_cmd_response(json.loads(dcs[len('@kitty-cmd'):]))
  237. elif dcs.startswith('1+r'):
  238. from binascii import unhexlify
  239. vals = dcs[3:].split(';')
  240. for q in vals:
  241. parts = q.split('=', 1)
  242. try:
  243. name, val = parts[0], unhexlify(parts[1]).decode('utf-8', 'replace')
  244. except Exception:
  245. continue
  246. self.handler.on_capability_response(name, val)
  247. def _on_csi(self, csi: str) -> None:
  248. q = csi[-1]
  249. if q in 'mM':
  250. if csi.startswith('<'):
  251. # SGR mouse event
  252. try:
  253. ev = decode_sgr_mouse(csi[1:], self.handler.screen_size)
  254. except Exception:
  255. pass
  256. else:
  257. self.handler.on_mouse_event(ev)
  258. elif q in 'u~ABCDEHFPQRS':
  259. if csi == '200~':
  260. self.in_bracketed_paste = True
  261. return
  262. elif csi == '201~':
  263. self.in_bracketed_paste = False
  264. return
  265. try:
  266. k = decode_key_event(csi[:-1], q)
  267. except Exception:
  268. pass
  269. else:
  270. if not self.handler.perform_default_key_action(k):
  271. self.handler.on_key_event(k)
  272. def _on_pm(self, pm: str) -> None:
  273. pass
  274. def _on_osc(self, osc: str) -> None:
  275. idx = osc.find(';')
  276. if idx <= 0:
  277. return
  278. q = osc[:idx]
  279. if q == '52':
  280. widx = osc.find(';', idx + 1)
  281. if widx < idx:
  282. from_primary = osc.find('p', idx + 1) > -1
  283. payload = ''
  284. else:
  285. from base64 import standard_b64decode
  286. from_primary = osc.find('p', idx+1, widx) > -1
  287. data = memoryview(osc.encode('ascii'))
  288. payload = standard_b64decode(data[widx+1:]).decode('utf-8')
  289. self.handler.on_clipboard_response(payload, from_primary)
  290. elif q == ftc_code:
  291. from kitty.file_transmission import FileTransmissionCommand
  292. data = memoryview(osc.encode('ascii'))
  293. self.handler.on_file_transfer_response(FileTransmissionCommand.deserialize(data[idx+1:]))
  294. def _on_apc(self, apc: str) -> None:
  295. if apc.startswith('G'):
  296. if self.handler.image_manager is not None:
  297. self.handler.image_manager.handle_response(apc)
  298. # }}}
  299. @property
  300. def total_pending_bytes_to_write(self) -> int:
  301. return sum(map(len, self.write_buf))
  302. def _write_ready(self, handler: Handler, fd: int) -> None:
  303. if len(self.write_buf) > self.iov_limit:
  304. self.write_buf[self.iov_limit - 1] = b''.join(self.write_buf[self.iov_limit - 1:])
  305. del self.write_buf[self.iov_limit:]
  306. total_size = self.total_pending_bytes_to_write
  307. if total_size:
  308. try:
  309. written = os.writev(fd, self.write_buf)
  310. except BlockingIOError:
  311. return
  312. if not written:
  313. handler.terminal_io_ended = True
  314. self.quit(1)
  315. return
  316. else:
  317. written = 0
  318. if written >= total_size:
  319. self.write_buf: List[bytes] = []
  320. self.asyncio_loop.remove_writer(fd)
  321. self.waiting_for_writes = False
  322. handler.on_writing_finished()
  323. else:
  324. consumed = 0
  325. for i, buf in enumerate(self.write_buf):
  326. if not written:
  327. break
  328. if len(buf) <= written:
  329. written -= len(buf)
  330. consumed += 1
  331. continue
  332. self.write_buf[i] = buf[written:]
  333. break
  334. del self.write_buf[:consumed]
  335. def quit(self, return_code: Optional[int] = None) -> None:
  336. if return_code is not None:
  337. self.return_code = return_code
  338. self.asyncio_loop.stop()
  339. def loop_impl(self, handler: Handler, term_manager: TermManager, image_manager: Optional[ImageManagerType] = None) -> Optional[str]:
  340. self.write_buf = []
  341. tty_fd = term_manager.tty_fd
  342. tb = None
  343. self.waiting_for_writes = True
  344. def schedule_write(data: bytes) -> None:
  345. self.write_buf.append(data)
  346. if not self.waiting_for_writes:
  347. self.asyncio_loop.add_writer(tty_fd, self._write_ready, handler, tty_fd)
  348. self.waiting_for_writes = True
  349. def handle_exception(loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None:
  350. nonlocal tb
  351. loop.stop()
  352. tb = context['message']
  353. exc = context.get('exception')
  354. if exc is not None:
  355. import traceback
  356. tb += '\n' + ''.join(traceback.format_exception(exc.__class__, exc, exc.__traceback__))
  357. self.asyncio_loop.set_exception_handler(handle_exception)
  358. handler._initialize(self._get_screen_size(), term_manager, schedule_write, self, debug, image_manager)
  359. with handler:
  360. if handler.overlay_ready_report_needed:
  361. handler.cmd.overlay_ready()
  362. self.asyncio_loop.add_reader(
  363. tty_fd, self._read_ready, handler, tty_fd)
  364. self.asyncio_loop.add_writer(
  365. tty_fd, self._write_ready, handler, tty_fd)
  366. self.asyncio_loop.run_forever()
  367. self.asyncio_loop.remove_reader(tty_fd)
  368. if self.waiting_for_writes:
  369. self.asyncio_loop.remove_writer(tty_fd)
  370. return tb
  371. def loop(self, handler: Handler) -> None:
  372. tb: Optional[str] = None
  373. def _on_sigwinch() -> None:
  374. self._get_screen_size.changed = True
  375. handler.screen_size = self._get_screen_size()
  376. handler.on_resize(handler.screen_size)
  377. signal_manager = SignalManager(self.asyncio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term, handler.on_hup)
  378. with TermManager(self.optional_actions, handler.use_alternate_screen, handler.mouse_tracking) as term_manager, signal_manager:
  379. self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd)
  380. image_manager = None
  381. if handler.image_manager_class is not None:
  382. image_manager = handler.image_manager_class(handler)
  383. try:
  384. tb = self.loop_impl(handler, term_manager, image_manager)
  385. except Exception:
  386. import traceback
  387. tb = traceback.format_exc()
  388. term_manager.extra_finalize = b''.join(self.write_buf).decode('utf-8')
  389. if tb is not None:
  390. report_overlay_ready = handler.overlay_ready_report_needed and not self.overlay_ready_reported
  391. self.return_code = 1
  392. if not handler.terminal_io_ended:
  393. self._report_error_loop(tb, term_manager, report_overlay_ready)
  394. def _report_error_loop(self, tb: str, term_manager: TermManager, overlay_ready_report_needed: bool) -> None:
  395. handler = UnhandledException(tb)
  396. handler.overlay_ready_report_needed = overlay_ready_report_needed
  397. self.loop_impl(handler, term_manager)