benchmark.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!./kitty/launcher/kitty +launch
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import fcntl
  4. import io
  5. import os
  6. import select
  7. import signal
  8. import struct
  9. import sys
  10. import termios
  11. import time
  12. from pty import CHILD, fork
  13. from kitty.constants import kitten_exe
  14. from kitty.fast_data_types import Screen
  15. from kitty.utils import read_screen_size
  16. def run_parsing_benchmark(cell_width: int = 10, cell_height: int = 20, scrollback: int = 20000) -> None:
  17. isatty = sys.stdout.isatty()
  18. if isatty:
  19. sz = read_screen_size()
  20. columns, rows = sz.cols, sz.rows
  21. else:
  22. columns, rows = 80, 25
  23. child_pid, master_fd = fork()
  24. is_child = child_pid == CHILD
  25. argv = [kitten_exe(), '__benchmark__', '--with-scrollback']
  26. if is_child:
  27. while read_screen_size().width != columns * cell_width:
  28. time.sleep(0.01)
  29. signal.pthread_sigmask(signal.SIG_SETMASK, ())
  30. os.execvp(argv[0], argv)
  31. # os.set_blocking(master_fd, False)
  32. x_pixels = columns * cell_width
  33. y_pixels = rows * cell_height
  34. s = struct.pack('HHHH', rows, columns, x_pixels, y_pixels)
  35. fcntl.ioctl(master_fd, termios.TIOCSWINSZ, s)
  36. write_buf = b''
  37. r_pipe, w_pipe = os.pipe2(os.O_CLOEXEC | os.O_NONBLOCK)
  38. class ToChild:
  39. def write(self, x: bytes | str) -> None:
  40. nonlocal write_buf
  41. if isinstance(x, str):
  42. x = x.encode()
  43. write_buf += x
  44. os.write(w_pipe, b'1')
  45. screen = Screen(None, rows, columns, scrollback, cell_width, cell_height, 0, ToChild())
  46. def parse_bytes(data: bytes) -> None:
  47. data = memoryview(data)
  48. while data:
  49. dest = screen.test_create_write_buffer()
  50. s = screen.test_commit_write_buffer(data, dest)
  51. data = data[s:]
  52. screen.test_parse_written_data()
  53. while True:
  54. rd, wd, _ = select.select([master_fd, r_pipe], [master_fd] if write_buf else [], [])
  55. if r_pipe in rd:
  56. os.read(r_pipe, 256)
  57. if master_fd in rd:
  58. try:
  59. data = os.read(master_fd, io.DEFAULT_BUFFER_SIZE)
  60. except OSError:
  61. data = b''
  62. if not data:
  63. break
  64. parse_bytes(data)
  65. if master_fd in wd:
  66. n = os.write(master_fd, write_buf)
  67. write_buf = write_buf[n:]
  68. if isatty:
  69. lines: list[str] = []
  70. screen.linebuf.as_ansi(lines.append)
  71. sys.stdout.write(''.join(lines))
  72. else:
  73. sys.stdout.write(str(screen.linebuf))
  74. def main() -> None:
  75. run_parsing_benchmark()
  76. if __name__ == '__main__':
  77. main()