__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import fcntl
  4. import io
  5. import os
  6. import select
  7. import shlex
  8. import shutil
  9. import signal
  10. import struct
  11. import sys
  12. import termios
  13. import time
  14. from contextlib import contextmanager, suppress
  15. from functools import wraps
  16. from pty import CHILD, STDIN_FILENO, STDOUT_FILENO, fork
  17. from typing import Optional
  18. from unittest import TestCase
  19. from kitty.config import finalize_keys, finalize_mouse_mappings
  20. from kitty.fast_data_types import Cursor, HistoryBuf, LineBuf, Screen, get_options, monotonic, set_options
  21. from kitty.options.parse import merge_result_dicts
  22. from kitty.options.types import Options, defaults
  23. from kitty.rgb import to_color
  24. from kitty.types import MouseEvent
  25. from kitty.utils import read_screen_size
  26. from kitty.window import decode_cmdline, process_remote_print, process_title_from_child
  27. def parse_bytes(screen, data, dump_callback=None):
  28. data = memoryview(data)
  29. while data:
  30. dest = screen.test_create_write_buffer()
  31. s = screen.test_commit_write_buffer(data, dest)
  32. data = data[s:]
  33. screen.test_parse_written_data(dump_callback)
  34. class Callbacks:
  35. def __init__(self, pty=None) -> None:
  36. self.clear()
  37. self.pty = pty
  38. self.ftc = None
  39. self.set_pointer_shape = lambda data: None
  40. self.last_cmd_at = 0
  41. self.last_cmd_cmdline = ''
  42. self.last_cmd_exit_status = sys.maxsize
  43. def write(self, data) -> None:
  44. self.wtcbuf += bytes(data)
  45. def notify_child_of_resize(self):
  46. self.num_of_resize_events += 1
  47. def color_control(self, code, data) -> None:
  48. from kitty.window import color_control
  49. response = color_control(self.color_profile, code, data)
  50. if response:
  51. def p(x):
  52. if '@' in x:
  53. return (to_color(x.partition('@')[0]), int(255 * float(x.partition('@')[2])))
  54. ans = to_color(x)
  55. if ans is None:
  56. ans = x
  57. return ans
  58. parts = {x.partition('=')[0]:p(x.partition('=')[2]) for x in response.split(';')[1:]}
  59. self.color_control_responses.append(parts)
  60. def title_changed(self, data, is_base64=False) -> None:
  61. self.titlebuf.append(process_title_from_child(data, is_base64, ''))
  62. def icon_changed(self, data) -> None:
  63. self.iconbuf += str(data, 'utf-8')
  64. def set_dynamic_color(self, code, data='') -> None:
  65. if code == 22:
  66. self.set_pointer_shape(data)
  67. else:
  68. self.colorbuf += str(data or b'', 'utf-8')
  69. def set_color_table_color(self, code, data='') -> None:
  70. self.ctbuf += ''
  71. def color_profile_popped(self, x) -> None:
  72. pass
  73. def cmd_output_marking(self, is_start: Optional[bool], data: str = '') -> None:
  74. if is_start:
  75. self.last_cmd_at = monotonic()
  76. self.last_cmd_cmdline = decode_cmdline(data) if data else data
  77. else:
  78. if self.last_cmd_at != 0:
  79. self.last_cmd_at = 0
  80. with suppress(Exception):
  81. self.last_cmd_exit_status = int(data)
  82. def request_capabilities(self, q) -> None:
  83. from kitty.terminfo import get_capabilities
  84. for c in get_capabilities(q, None):
  85. self.write(c.encode('ascii'))
  86. def desktop_notify(self, osc_code: int, raw_data: memoryview) -> None:
  87. self.notifications.append((osc_code, str(raw_data, 'utf-8')))
  88. def open_url(self, url: str, hyperlink_id: int) -> None:
  89. self.open_urls.append((url, hyperlink_id))
  90. def clipboard_control(self, data: memoryview, is_partial: bool = False) -> None:
  91. self.cc_buf.append((str(data, 'utf-8'), is_partial))
  92. def clear(self) -> None:
  93. self.wtcbuf = b''
  94. self.iconbuf = self.colorbuf = self.ctbuf = ''
  95. self.titlebuf = []
  96. self.printbuf = []
  97. self.color_control_responses = []
  98. self.notifications = []
  99. self.open_urls = []
  100. self.cc_buf = []
  101. self.bell_count = 0
  102. self.clone_cmds = []
  103. self.current_clone_data = ''
  104. self.last_cmd_exit_status = sys.maxsize
  105. self.last_cmd_cmdline = ''
  106. self.last_cmd_at = 0
  107. self.num_of_resize_events = 0
  108. def on_bell(self) -> None:
  109. self.bell_count += 1
  110. def on_activity_since_last_focus(self) -> None:
  111. pass
  112. def on_mouse_event(self, event):
  113. ev = MouseEvent(**event)
  114. opts = get_options()
  115. action_def = opts.mousemap.get(ev)
  116. if not action_def:
  117. return False
  118. self.current_mouse_button = ev.button
  119. for action in opts.alias_map.resolve_aliases(action_def, 'mouse_map'):
  120. getattr(self, action.func)(*action.args)
  121. self.current_mouse_button = 0
  122. return True
  123. def handle_remote_print(self, msg):
  124. text = process_remote_print(msg)
  125. self.printbuf.append(text)
  126. def handle_remote_cmd(self, msg):
  127. pass
  128. def handle_remote_clone(self, msg):
  129. msg = str(msg, 'utf-8')
  130. if not msg:
  131. if self.current_clone_data:
  132. cdata, self.current_clone_data = self.current_clone_data, ''
  133. from kitty.launch import CloneCmd
  134. self.clone_cmds.append(CloneCmd(cdata))
  135. self.current_clone_data = ''
  136. return
  137. num, rest = msg.split(':', 1)
  138. if num == '0' or len(self.current_clone_data) > 1024 * 1024:
  139. self.current_clone_data = ''
  140. self.current_clone_data += rest
  141. def handle_remote_ssh(self, msg):
  142. from kittens.ssh.utils import get_ssh_data
  143. if self.pty:
  144. for line in get_ssh_data(msg, "testing"):
  145. self.pty.write_to_child(line)
  146. def handle_remote_echo(self, msg):
  147. from base64 import standard_b64decode
  148. if self.pty:
  149. data = standard_b64decode(msg)
  150. self.pty.write_to_child(data)
  151. def file_transmission(self, data):
  152. if self.ftc:
  153. self.ftc.handle_serialized_command(data)
  154. def filled_line_buf(ynum=5, xnum=5, cursor=Cursor()):
  155. ans = LineBuf(ynum, xnum)
  156. cursor.x = 0
  157. for i in range(ynum):
  158. t = (f'{i}') * xnum
  159. ans.line(i).set_text(t, 0, xnum, cursor)
  160. return ans
  161. def filled_cursor():
  162. ans = Cursor()
  163. ans.bold = ans.italic = ans.reverse = ans.strikethrough = ans.dim = True
  164. ans.fg = 0x101
  165. ans.bg = 0x201
  166. ans.decoration_fg = 0x301
  167. return ans
  168. def filled_history_buf(ynum=5, xnum=5, cursor=Cursor()):
  169. lb = filled_line_buf(ynum, xnum, cursor)
  170. ans = HistoryBuf(ynum, xnum)
  171. for i in range(ynum):
  172. ans.push(lb.line(i))
  173. return ans
  174. def retry_on_failure(max_attempts=2, sleep_duration=2):
  175. def decorator(func):
  176. @wraps(func)
  177. def wrapper(*args, **kwargs):
  178. for attempt in range(max_attempts):
  179. try:
  180. return func(*args, **kwargs)
  181. except Exception as e:
  182. if attempt < max_attempts - 1: # Don't sleep on the last attempt
  183. time.sleep(sleep_duration)
  184. else:
  185. raise e # Re-raise the last exception
  186. return wrapper
  187. return decorator
  188. class BaseTest(TestCase):
  189. ae = TestCase.assertEqual
  190. maxDiff = 2048
  191. is_ci = os.environ.get('CI') == 'true'
  192. def rmtree_ignoring_errors(self, tdir):
  193. try:
  194. shutil.rmtree(tdir)
  195. except FileNotFoundError as err:
  196. print('Failed to delete the directory:', tdir, 'with error:', err, file=sys.stderr)
  197. def tearDown(self):
  198. set_options(None)
  199. def set_options(self, options=None):
  200. final_options = {'scrollback_pager_history_size': 1024, 'click_interval': 0.5}
  201. if options:
  202. final_options.update(options)
  203. options = Options(merge_result_dicts(defaults._asdict(), final_options))
  204. finalize_keys(options, {})
  205. finalize_mouse_mappings(options, {})
  206. set_options(options)
  207. return options
  208. def cmd_to_run_python_code(self, code):
  209. from kitty.constants import kitty_exe
  210. return [kitty_exe(), '+runpy', code]
  211. def create_screen(self, cols=5, lines=5, scrollback=5, cell_width=10, cell_height=20, options=None):
  212. self.set_options(options)
  213. c = Callbacks()
  214. s = Screen(c, lines, cols, scrollback, cell_width, cell_height, 0, c)
  215. c.color_profile = s.color_profile
  216. return s
  217. def create_pty(
  218. self, argv=None, cols=80, lines=100, scrollback=100, cell_width=10, cell_height=20,
  219. options=None, cwd=None, env=None, stdin_fd=None, stdout_fd=None
  220. ):
  221. self.set_options(options)
  222. return PTY(argv, lines, cols, scrollback, cell_width, cell_height, cwd, env, stdin_fd=stdin_fd, stdout_fd=stdout_fd)
  223. def assertEqualAttributes(self, c1, c2):
  224. x1, y1, c1.x, c1.y = c1.x, c1.y, 0, 0
  225. x2, y2, c2.x, c2.y = c2.x, c2.y, 0, 0
  226. try:
  227. self.assertEqual(c1, c2)
  228. finally:
  229. c1.x, c1.y, c2.x, c2.y = x1, y1, x2, y2
  230. debug_stdout = debug_stderr = -1
  231. @contextmanager
  232. def forwardable_stdio():
  233. global debug_stderr, debug_stdout
  234. debug_stdout = fd = os.dup(sys.stdout.fileno())
  235. os.set_inheritable(fd, True)
  236. debug_stderr = fd = os.dup(sys.stderr.fileno())
  237. os.set_inheritable(fd, True)
  238. try:
  239. yield
  240. finally:
  241. os.close(debug_stderr)
  242. os.close(debug_stdout)
  243. debug_stderr = debug_stdout = -1
  244. class PTY:
  245. def __init__(
  246. self, argv=None, rows=25, columns=80, scrollback=100, cell_width=10, cell_height=20,
  247. cwd=None, env=None, stdin_fd=None, stdout_fd=None
  248. ):
  249. self.is_child = False
  250. if isinstance(argv, str):
  251. argv = shlex.split(argv)
  252. self.write_buf = b''
  253. if argv is None:
  254. from kitty.child import openpty
  255. self.master_fd, self.slave_fd = openpty()
  256. self.child_pid = 0
  257. else:
  258. self.child_pid, self.master_fd = fork()
  259. self.is_child = self.child_pid == CHILD
  260. self.child_waited_for = False
  261. if self.is_child:
  262. while read_screen_size().width != columns * cell_width:
  263. time.sleep(0.01)
  264. if cwd:
  265. os.chdir(cwd)
  266. if stdin_fd is not None:
  267. os.dup2(stdin_fd, STDIN_FILENO)
  268. os.close(stdin_fd)
  269. if stdout_fd is not None:
  270. os.dup2(stdout_fd, STDOUT_FILENO)
  271. os.close(stdout_fd)
  272. signal.pthread_sigmask(signal.SIG_SETMASK, ())
  273. env = os.environ if env is None else env
  274. if debug_stdout > -1:
  275. env['KITTY_STDIO_FORWARDED'] = str(debug_stdout)
  276. os.execvpe(argv[0], argv, env)
  277. if stdin_fd is not None:
  278. os.close(stdin_fd)
  279. if stdout_fd is not None:
  280. os.close(stdout_fd)
  281. os.set_blocking(self.master_fd, False)
  282. self.cell_width = cell_width
  283. self.cell_height = cell_height
  284. self.set_window_size(rows=rows, columns=columns)
  285. self.callbacks = Callbacks(self)
  286. self.screen = Screen(self.callbacks, rows, columns, scrollback, cell_width, cell_height, 0, self.callbacks)
  287. self.received_bytes = b''
  288. def turn_off_echo(self):
  289. s = termios.tcgetattr(self.master_fd)
  290. s[3] &= ~termios.ECHO
  291. termios.tcsetattr(self.master_fd, termios.TCSANOW, s)
  292. def is_echo_on(self):
  293. s = termios.tcgetattr(self.master_fd)
  294. return True if s[3] & termios.ECHO else False
  295. def __del__(self):
  296. if not self.is_child:
  297. if hasattr(self, 'master_fd'):
  298. os.close(self.master_fd)
  299. del self.master_fd
  300. if hasattr(self, 'slave_fd'):
  301. os.close(self.slave_fd)
  302. del self.slave_fd
  303. if self.child_pid > 0 and not self.child_waited_for:
  304. os.waitpid(self.child_pid, 0)
  305. self.child_waited_for = True
  306. def write_to_child(self, data, flush=False):
  307. if isinstance(data, str):
  308. data = data.encode('utf-8')
  309. self.write_buf += data
  310. if flush:
  311. self.process_input_from_child(0)
  312. def send_cmd_to_child(self, cmd, flush=False):
  313. self.callbacks.last_cmd_exit_status = sys.maxsize
  314. self.last_cmd = cmd
  315. self.write_to_child(cmd + '\r', flush=flush)
  316. def process_input_from_child(self, timeout=10):
  317. rd, wd, _ = select.select([self.master_fd], [self.master_fd] if self.write_buf else [], [], max(0, timeout))
  318. if wd:
  319. n = os.write(self.master_fd, self.write_buf)
  320. self.write_buf = self.write_buf[n:]
  321. bytes_read = 0
  322. if rd:
  323. data = os.read(self.master_fd, io.DEFAULT_BUFFER_SIZE)
  324. bytes_read += len(data)
  325. self.received_bytes += data
  326. parse_bytes(self.screen, data)
  327. return bytes_read
  328. def wait_till(self, q, timeout=10, timeout_msg=None):
  329. end_time = time.monotonic() + timeout
  330. while not q() and time.monotonic() <= end_time:
  331. try:
  332. self.process_input_from_child(timeout=end_time - time.monotonic())
  333. except OSError as e:
  334. if not q():
  335. raise Exception(f'Failed to read from pty with error: {e}. Screen contents: \n {repr(self.screen_contents())}') from e
  336. return
  337. if not q():
  338. msg = 'The condition was not met'
  339. if timeout_msg is not None:
  340. msg = timeout_msg()
  341. raise TimeoutError(f'Timed out: {msg}. Screen contents: \n {repr(self.screen_contents())}')
  342. def wait_till_child_exits(self, timeout=30 if BaseTest.is_ci else 10, require_exit_code=None):
  343. end_time = time.monotonic() + timeout
  344. while time.monotonic() <= end_time:
  345. si_pid, status = os.waitpid(self.child_pid, os.WNOHANG)
  346. if si_pid == self.child_pid and os.WIFEXITED(status):
  347. ec = os.waitstatus_to_exitcode(status) if hasattr(os, 'waitstatus_to_exitcode') else require_exit_code
  348. self.child_waited_for = True
  349. if require_exit_code is not None and ec != require_exit_code:
  350. raise AssertionError(
  351. f'Child exited with exit status: {status} code: {ec} != {require_exit_code}.'
  352. f' Screen contents:\n{self.screen_contents()}')
  353. return status
  354. with suppress(OSError):
  355. self.process_input_from_child(timeout=0.02)
  356. raise AssertionError(f'Child did not exit in {timeout} seconds. Screen contents:\n{self.screen_contents()}')
  357. def set_window_size(self, rows=25, columns=80, send_signal=True):
  358. if hasattr(self, 'screen'):
  359. self.screen.resize(rows, columns)
  360. if send_signal:
  361. x_pixels = columns * self.cell_width
  362. y_pixels = rows * self.cell_height
  363. s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
  364. fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, s)
  365. def screen_contents(self):
  366. lines = []
  367. for i in range(self.screen.lines):
  368. x = str(self.screen.line(i))
  369. if x:
  370. lines.append(x)
  371. return '\n'.join(lines)
  372. def last_cmd_output(self, as_ansi=False, add_wrap_markers=False):
  373. from kitty.window import cmd_output
  374. return cmd_output(self.screen, as_ansi=as_ansi, add_wrap_markers=add_wrap_markers)