123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- #!/usr/bin/env python
- # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
- import asyncio
- import codecs
- import io
- import os
- import re
- import selectors
- import signal
- import sys
- import termios
- from contextlib import contextmanager, suppress
- from enum import Enum, IntFlag, auto
- from functools import partial
- from typing import Any, Callable, Dict, Generator, List, NamedTuple, Optional
- from kitty.constants import is_macos
- from kitty.fast_data_types import FILE_TRANSFER_CODE, close_tty, normal_tty, open_tty, parse_input_from_terminal, raw_tty
- from kitty.key_encoding import ALT, CTRL, SHIFT, backspace_key, decode_key_event, enter_key
- from kitty.typing import ImageManagerType, KeyEventType, Protocol
- from kitty.utils import ScreenSize, ScreenSizeGetter, screen_size_function, write_all
- from .handler import Handler
- from .operations import MouseTracking, init_state, reset_state
- class BinaryWrite(Protocol):
- def write(self, data: bytes) -> None:
- pass
- def flush(self) -> None:
- pass
- def debug_write(*a: Any, **kw: Any) -> None:
- from base64 import standard_b64encode
- fobj = kw.pop('file', sys.stderr.buffer)
- buf = io.StringIO()
- kw['file'] = buf
- print(*a, **kw)
- stext = buf.getvalue()
- for i in range(0, len(stext), 256):
- chunk = stext[i:i + 256]
- text = b'\x1bP@kitty-print|' + standard_b64encode(chunk.encode('utf-8')) + b'\x1b\\'
- fobj.write(text)
- fobj.flush()
- class Debug:
- fobj: Optional[BinaryWrite] = None
- def __call__(self, *a: Any, **kw: Any) -> None:
- kw['file'] = self.fobj or sys.stdout.buffer
- debug_write(*a, **kw)
- debug = Debug()
- ftc_code = str(FILE_TRANSFER_CODE)
- class TermManager:
- def __init__(
- self, optional_actions: int = termios.TCSANOW, use_alternate_screen: bool = True,
- mouse_tracking: MouseTracking = MouseTracking.none
- ) -> None:
- self.extra_finalize: Optional[str] = None
- self.optional_actions = optional_actions
- self.use_alternate_screen = use_alternate_screen
- self.mouse_tracking = mouse_tracking
- def set_state_for_loop(self, set_raw: bool = True) -> None:
- if set_raw:
- raw_tty(self.tty_fd, self.original_termios)
- write_all(self.tty_fd, init_state(self.use_alternate_screen, self.mouse_tracking))
- def reset_state_to_original(self) -> None:
- normal_tty(self.tty_fd, self.original_termios)
- if self.extra_finalize:
- write_all(self.tty_fd, self.extra_finalize)
- write_all(self.tty_fd, reset_state(self.use_alternate_screen))
- @contextmanager
- def suspend(self) -> Generator['TermManager', None, None]:
- self.reset_state_to_original()
- yield self
- self.set_state_for_loop()
- def __enter__(self) -> 'TermManager':
- self.tty_fd, self.original_termios = open_tty(False, self.optional_actions)
- self.set_state_for_loop(set_raw=False)
- return self
- def __exit__(self, *a: object) -> None:
- with suppress(Exception):
- self.reset_state_to_original()
- close_tty(self.tty_fd, self.original_termios)
- del self.tty_fd, self.original_termios
- class MouseButton(IntFlag):
- NONE, LEFT, MIDDLE, RIGHT, FOURTH, FIFTH, SIXTH, SEVENTH = 0, 1, 2, 4, 8, 16, 32, 64
- WHEEL_UP, WHEEL_DOWN, WHEEL_LEFT, WHEEL_RIGHT = -1, -2, -4, -8
- bmap = MouseButton.LEFT, MouseButton.MIDDLE, MouseButton.RIGHT
- ebmap = MouseButton.FOURTH, MouseButton.FIFTH, MouseButton.SIXTH, MouseButton.SEVENTH
- wbmap = MouseButton.WHEEL_UP, MouseButton.WHEEL_DOWN, MouseButton.WHEEL_LEFT, MouseButton.WHEEL_RIGHT
- SHIFT_INDICATOR = 1 << 2
- ALT_INDICATOR = 1 << 3
- CTRL_INDICATOR = 1 << 4
- MOTION_INDICATOR = 1 << 5
- class EventType(Enum):
- PRESS = auto()
- RELEASE = auto()
- MOVE = auto()
- class MouseEvent(NamedTuple):
- cell_x: int
- cell_y: int
- pixel_x: int
- pixel_y: int
- type: EventType
- buttons: MouseButton
- mods: int
- def pixel_to_cell(px: int, length: int, cell_length: int) -> int:
- px = max(0, min(px, length - 1))
- return px // cell_length
- def decode_sgr_mouse(text: str, screen_size: ScreenSize) -> MouseEvent:
- cb_, x_, y_ = text.split(';')
- m, y_ = y_[-1], y_[:-1]
- cb, x, y = map(int, (cb_, x_, y_))
- typ = EventType.RELEASE if m == 'm' else (EventType.MOVE if cb & MOTION_INDICATOR else EventType.PRESS)
- buttons: MouseButton = MouseButton.NONE
- cb3 = cb & 3
- if cb >= 128:
- buttons |= ebmap[cb3]
- elif cb >= 64:
- buttons |= wbmap[cb3]
- elif cb3 < 3:
- buttons |= bmap[cb3]
- mods = 0
- if cb & SHIFT_INDICATOR:
- mods |= SHIFT
- if cb & ALT_INDICATOR:
- mods |= ALT
- if cb & CTRL_INDICATOR:
- mods |= CTRL
- return MouseEvent(
- pixel_to_cell(x, screen_size.width, screen_size.cell_width), pixel_to_cell(y, screen_size.height, screen_size.cell_height),
- x, y, typ, buttons, mods
- )
- class UnhandledException(Handler):
- def __init__(self, tb: str) -> None:
- self.tb = tb
- def initialize(self) -> None:
- self.cmd.clear_screen()
- self.cmd.set_scrolling_region()
- self.cmd.set_cursor_visible(True)
- self.cmd.set_default_colors()
- self.write(self.tb.replace('\n', '\r\n'))
- self.write('\r\n')
- self.write('Press Enter to quit')
- def on_key(self, key_event: KeyEventType) -> None:
- if key_event.key == 'ENTER':
- self.quit_loop(1)
- def on_interrupt(self) -> None:
- self.quit_loop(1)
- on_eot = on_term = on_interrupt
- class SignalManager:
- def __init__(
- self,
- loop: asyncio.AbstractEventLoop,
- on_winch: Callable[[], None],
- on_interrupt: Callable[[], None],
- on_term: Callable[[], None],
- on_hup: Callable[[], None],
- ) -> None:
- self.asyncio_loop = loop
- self.on_winch, self.on_interrupt, self.on_term = on_winch, on_interrupt, on_term
- self.on_hup = on_hup
- def __enter__(self) -> None:
- self.asyncio_loop.add_signal_handler(signal.SIGWINCH, self.on_winch)
- self.asyncio_loop.add_signal_handler(signal.SIGINT, self.on_interrupt)
- self.asyncio_loop.add_signal_handler(signal.SIGTERM, self.on_term)
- self.asyncio_loop.add_signal_handler(signal.SIGHUP, self.on_hup)
- def __exit__(self, *a: Any) -> None:
- tuple(map(self.asyncio_loop.remove_signal_handler, (
- signal.SIGWINCH, signal.SIGINT, signal.SIGTERM, signal.SIGHUP)))
- sanitize_bracketed_paste: str = '[\x03\x04\x0e\x0f\r\x07\x7f\x8d\x8e\x8f\x90\x9b\x9d\x9e\x9f]'
- class Loop:
- def __init__(
- self,
- sanitize_bracketed_paste: str = sanitize_bracketed_paste,
- optional_actions: int = termios.TCSADRAIN
- ):
- if is_macos:
- # On macOS PTY devices are not supported by the KqueueSelector and
- # the PollSelector is broken, causes 100% CPU usage
- self.asyncio_loop: asyncio.AbstractEventLoop = asyncio.SelectorEventLoop(selectors.SelectSelector())
- asyncio.set_event_loop(self.asyncio_loop)
- else:
- self.asyncio_loop = asyncio.get_event_loop()
- self.return_code = 0
- self.overlay_ready_reported = False
- self.optional_actions = optional_actions
- self.read_buf = ''
- self.decoder = codecs.getincrementaldecoder('utf-8')('ignore')
- try:
- self.iov_limit = max(os.sysconf('SC_IOV_MAX') - 1, 255)
- except Exception:
- self.iov_limit = 255
- self.parse_input_from_terminal = partial(parse_input_from_terminal, self._on_text, self._on_dcs, self._on_csi, self._on_osc, self._on_pm, self._on_apc)
- self.ebs_pat = re.compile('([\177\r\x03\x04])')
- self.in_bracketed_paste = False
- self.sanitize_bracketed_paste = bool(sanitize_bracketed_paste)
- if self.sanitize_bracketed_paste:
- self.sanitize_ibp_pat = re.compile(sanitize_bracketed_paste)
- def _read_ready(self, handler: Handler, fd: int) -> None:
- try:
- bdata = os.read(fd, io.DEFAULT_BUFFER_SIZE)
- except BlockingIOError:
- return
- if not bdata:
- handler.terminal_io_ended = True
- self.quit(1)
- return
- data = self.decoder.decode(bdata)
- if self.read_buf:
- data = self.read_buf + data
- self.read_buf = data
- self.handler = handler
- try:
- self.read_buf = self.parse_input_from_terminal(self.read_buf, self.in_bracketed_paste)
- except Exception:
- self.read_buf = ''
- raise
- finally:
- del self.handler
- # terminal input callbacks {{{
- def _on_text(self, text: str) -> None:
- if self.in_bracketed_paste and self.sanitize_bracketed_paste:
- text = self.sanitize_ibp_pat.sub('', text)
- for chunk in self.ebs_pat.split(text):
- if len(chunk) == 1:
- if chunk == '\r':
- self.handler.on_key(enter_key)
- elif chunk == '\177':
- self.handler.on_key(backspace_key)
- elif chunk == '\x03':
- self.handler.on_interrupt()
- elif chunk == '\x04':
- self.handler.on_eot()
- else:
- self.handler.on_text(chunk, self.in_bracketed_paste)
- elif chunk:
- self.handler.on_text(chunk, self.in_bracketed_paste)
- def _on_dcs(self, dcs: str) -> None:
- if dcs.startswith('@kitty-cmd'):
- import json
- self.handler.on_kitty_cmd_response(json.loads(dcs[len('@kitty-cmd'):]))
- elif dcs.startswith('1+r'):
- from binascii import unhexlify
- vals = dcs[3:].split(';')
- for q in vals:
- parts = q.split('=', 1)
- try:
- name, val = parts[0], unhexlify(parts[1]).decode('utf-8', 'replace')
- except Exception:
- continue
- self.handler.on_capability_response(name, val)
- def _on_csi(self, csi: str) -> None:
- q = csi[-1]
- if q in 'mM':
- if csi.startswith('<'):
- # SGR mouse event
- try:
- ev = decode_sgr_mouse(csi[1:], self.handler.screen_size)
- except Exception:
- pass
- else:
- self.handler.on_mouse_event(ev)
- elif q in 'u~ABCDEHFPQRS':
- if csi == '200~':
- self.in_bracketed_paste = True
- return
- elif csi == '201~':
- self.in_bracketed_paste = False
- return
- try:
- k = decode_key_event(csi[:-1], q)
- except Exception:
- pass
- else:
- if not self.handler.perform_default_key_action(k):
- self.handler.on_key_event(k)
- def _on_pm(self, pm: str) -> None:
- pass
- def _on_osc(self, osc: str) -> None:
- idx = osc.find(';')
- if idx <= 0:
- return
- q = osc[:idx]
- if q == '52':
- widx = osc.find(';', idx + 1)
- if widx < idx:
- from_primary = osc.find('p', idx + 1) > -1
- payload = ''
- else:
- from base64 import standard_b64decode
- from_primary = osc.find('p', idx+1, widx) > -1
- data = memoryview(osc.encode('ascii'))
- payload = standard_b64decode(data[widx+1:]).decode('utf-8')
- self.handler.on_clipboard_response(payload, from_primary)
- elif q == ftc_code:
- from kitty.file_transmission import FileTransmissionCommand
- data = memoryview(osc.encode('ascii'))
- self.handler.on_file_transfer_response(FileTransmissionCommand.deserialize(data[idx+1:]))
- def _on_apc(self, apc: str) -> None:
- if apc.startswith('G'):
- if self.handler.image_manager is not None:
- self.handler.image_manager.handle_response(apc)
- # }}}
- @property
- def total_pending_bytes_to_write(self) -> int:
- return sum(map(len, self.write_buf))
- def _write_ready(self, handler: Handler, fd: int) -> None:
- if len(self.write_buf) > self.iov_limit:
- self.write_buf[self.iov_limit - 1] = b''.join(self.write_buf[self.iov_limit - 1:])
- del self.write_buf[self.iov_limit:]
- total_size = self.total_pending_bytes_to_write
- if total_size:
- try:
- written = os.writev(fd, self.write_buf)
- except BlockingIOError:
- return
- if not written:
- handler.terminal_io_ended = True
- self.quit(1)
- return
- else:
- written = 0
- if written >= total_size:
- self.write_buf: List[bytes] = []
- self.asyncio_loop.remove_writer(fd)
- self.waiting_for_writes = False
- handler.on_writing_finished()
- else:
- consumed = 0
- for i, buf in enumerate(self.write_buf):
- if not written:
- break
- if len(buf) <= written:
- written -= len(buf)
- consumed += 1
- continue
- self.write_buf[i] = buf[written:]
- break
- del self.write_buf[:consumed]
- def quit(self, return_code: Optional[int] = None) -> None:
- if return_code is not None:
- self.return_code = return_code
- self.asyncio_loop.stop()
- def loop_impl(self, handler: Handler, term_manager: TermManager, image_manager: Optional[ImageManagerType] = None) -> Optional[str]:
- self.write_buf = []
- tty_fd = term_manager.tty_fd
- tb = None
- self.waiting_for_writes = True
- def schedule_write(data: bytes) -> None:
- self.write_buf.append(data)
- if not self.waiting_for_writes:
- self.asyncio_loop.add_writer(tty_fd, self._write_ready, handler, tty_fd)
- self.waiting_for_writes = True
- def handle_exception(loop: asyncio.AbstractEventLoop, context: Dict[str, Any]) -> None:
- nonlocal tb
- loop.stop()
- tb = context['message']
- exc = context.get('exception')
- if exc is not None:
- import traceback
- tb += '\n' + ''.join(traceback.format_exception(exc.__class__, exc, exc.__traceback__))
- self.asyncio_loop.set_exception_handler(handle_exception)
- handler._initialize(self._get_screen_size(), term_manager, schedule_write, self, debug, image_manager)
- with handler:
- if handler.overlay_ready_report_needed:
- handler.cmd.overlay_ready()
- self.asyncio_loop.add_reader(
- tty_fd, self._read_ready, handler, tty_fd)
- self.asyncio_loop.add_writer(
- tty_fd, self._write_ready, handler, tty_fd)
- self.asyncio_loop.run_forever()
- self.asyncio_loop.remove_reader(tty_fd)
- if self.waiting_for_writes:
- self.asyncio_loop.remove_writer(tty_fd)
- return tb
- def loop(self, handler: Handler) -> None:
- tb: Optional[str] = None
- def _on_sigwinch() -> None:
- self._get_screen_size.changed = True
- handler.screen_size = self._get_screen_size()
- handler.on_resize(handler.screen_size)
- signal_manager = SignalManager(self.asyncio_loop, _on_sigwinch, handler.on_interrupt, handler.on_term, handler.on_hup)
- with TermManager(self.optional_actions, handler.use_alternate_screen, handler.mouse_tracking) as term_manager, signal_manager:
- self._get_screen_size: ScreenSizeGetter = screen_size_function(term_manager.tty_fd)
- image_manager = None
- if handler.image_manager_class is not None:
- image_manager = handler.image_manager_class(handler)
- try:
- tb = self.loop_impl(handler, term_manager, image_manager)
- except Exception:
- import traceback
- tb = traceback.format_exc()
- term_manager.extra_finalize = b''.join(self.write_buf).decode('utf-8')
- if tb is not None:
- report_overlay_ready = handler.overlay_ready_report_needed and not self.overlay_ready_reported
- self.return_code = 1
- if not handler.terminal_io_ended:
- self._report_error_loop(tb, term_manager, report_overlay_ready)
- def _report_error_loop(self, tb: str, term_manager: TermManager, overlay_ready_report_needed: bool) -> None:
- handler = UnhandledException(tb)
- handler.overlay_ready_report_needed = overlay_ready_report_needed
- self.loop_impl(handler, term_manager)
|