graphics.py 53 KB


  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. import random
  5. import tempfile
  6. import time
  7. import unittest
  8. import zlib
  9. from contextlib import suppress
  10. from dataclasses import dataclass
  11. from io import BytesIO
  12. from kitty.fast_data_types import base64_decode, base64_encode, has_avx2, has_sse4_2, load_png_data, shm_unlink, shm_write, test_xor64
  13. from . import BaseTest, parse_bytes
  14. try:
  15. from PIL import Image
  16. except ImportError:
  17. Image = None
  18. def send_command(screen, cmd, payload=b''):
  19. cmd = '\033_G' + cmd
  20. if payload:
  21. if isinstance(payload, str):
  22. payload = payload.encode('utf-8')
  23. payload = base64_encode(payload).decode('ascii')
  24. cmd += ';' + payload
  25. cmd += '\033\\'
  26. c = screen.callbacks
  27. c.clear()
  28. parse_bytes(screen, cmd.encode('ascii'))
  29. return c.wtcbuf
  30. def parse_response(res):
  31. if not res:
  32. return
  33. return res.decode('ascii').partition(';')[2].partition('\033')[0]
  34. def parse_response_with_ids(res):
  35. if not res:
  36. return
  37. a, b = res.decode('ascii').split(';', 1)
  38. code = b.partition('\033')[0].split(':', 1)[0]
  39. a = a.split('G', 1)[1]
  40. return code, a
  41. @dataclass(frozen=True)
  42. class Response:
  43. code: str = 'OK'
  44. msg: str = ''
  45. image_id: int = 0
  46. image_number: int = 0
  47. frame_number: int = 0
  48. def parse_full_response(res):
  49. if not res:
  50. return
  51. a, b = res.decode('ascii').split(';', 1)
  52. code = b.partition('\033')[0].split(':', 1)
  53. if len(code) == 1:
  54. code = code[0]
  55. msg = ''
  56. else:
  57. code, msg = code
  58. a = a.split('G', 1)[1]
  59. ans = {'code': code, 'msg': msg}
  60. for x in a.split(','):
  61. k, _, v = x.partition('=')
  62. ans[{'i': 'image_id', 'I': 'image_number', 'r': 'frame_number'}[k]] = int(v)
  63. return Response(**ans)
  64. all_bytes = bytes(bytearray(range(256)))
  65. def byte_block(sz):
  66. d, m = divmod(sz, len(all_bytes))
  67. return (all_bytes * d) + all_bytes[:m]
  68. def load_helpers(self):
  69. s = self.create_screen()
  70. g = s.grman
  71. def pl(payload, **kw):
  72. kw.setdefault('i', 1)
  73. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  74. res = send_command(s, cmd, payload)
  75. return parse_response(res)
  76. def sl(payload, **kw):
  77. if isinstance(payload, str):
  78. payload = payload.encode('utf-8')
  79. data = kw.pop('expecting_data', payload)
  80. cid = kw.setdefault('i', 1)
  81. self.ae('OK', pl(payload, **kw))
  82. img = g.image_for_client_id(cid)
  83. self.assertIsNotNone(img, f'No image with id {cid} found')
  84. self.ae(img['client_id'], cid)
  85. self.ae(img['data'], data)
  86. if 's' in kw:
  87. self.ae((kw['s'], kw['v']), (img['width'], img['height']))
  88. self.ae(img['is_4byte_aligned'], kw.get('f') != 24)
  89. return img
  90. return s, g, pl, sl
  91. def put_helpers(self, cw, ch, cols=10, lines=5):
  92. iid = 0
  93. def create_screen():
  94. s = self.create_screen(cols, lines, cell_width=cw, cell_height=ch)
  95. return s, 2 / s.columns, 2 / s.lines
  96. def put_cmd(
  97. z=0, num_cols=0, num_lines=0, x_off=0, y_off=0, width=0, height=0, cell_x_off=0,
  98. cell_y_off=0, placement_id=0, cursor_movement=0, unicode_placeholder=0, parent_id=0,
  99. parent_placement_id=0, offset_from_parent_x=0, offset_from_parent_y=0,
  100. ):
  101. return (
  102. f'z={z},c={num_cols},r={num_lines},x={x_off},y={y_off},w={width},h={height},'
  103. f'X={cell_x_off},Y={cell_y_off},p={placement_id},C={cursor_movement},'
  104. f'U={unicode_placeholder},P={parent_id},Q={parent_placement_id},'
  105. f'H={offset_from_parent_x},V={offset_from_parent_y}'
  106. )
  107. def put_image(screen, w, h, **kw):
  108. nonlocal iid
  109. iid += 1
  110. imgid = kw.pop('id', None) or iid
  111. no_id = kw.pop('no_id', False)
  112. a = kw.pop('a', 'T')
  113. if no_id:
  114. cmd = f'a={a},f=24,s=%d,v=%d,%s' % (w, h, put_cmd(**kw))
  115. else:
  116. cmd = f'a={a},f=24,i=%d,s=%d,v=%d,%s' % (imgid, w, h, put_cmd(**kw))
  117. data = b'x' * w * h * 3
  118. res = send_command(screen, cmd, data)
  119. return imgid, parse_response(res)
  120. def put_ref(screen, **kw):
  121. imgid = kw.pop('id', None) or iid
  122. cmd = 'a=p,i=%d,%s' % (imgid, put_cmd(**kw))
  123. return imgid, parse_response_with_ids(send_command(screen, cmd))
  124. def layers(screen, scrolled_by=0, xstart=-1, ystart=1):
  125. return screen.grman.update_layers(scrolled_by, xstart, ystart, dx, dy, screen.columns, screen.lines, cw, ch)
  126. def rect_eq(r, left, top, right, bottom):
  127. for side in 'left top right bottom'.split():
  128. a, b = r[side], locals()[side]
  129. if abs(a - b) > 0.0001:
  130. self.ae(a, b, 'the %s side is not equal' % side)
  131. s, dx, dy = create_screen()
  132. return s, dx, dy, put_image, put_ref, layers, rect_eq
  133. def make_send_command(screen):
  134. def li(payload='abcdefghijkl'*3, s=4, v=3, f=24, a='f', i=1, **kw):
  135. if s:
  136. kw['s'] = s
  137. if v:
  138. kw['v'] = v
  139. if f:
  140. kw['f'] = f
  141. if i:
  142. kw['i'] = i
  143. kw['a'] = a
  144. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  145. res = send_command(screen, cmd, payload)
  146. return parse_full_response(res)
  147. return li
  148. class TestGraphics(BaseTest):
  149. def test_xor_data(self):
  150. base_data = b'\x01' * 64
  151. key = b'\x02' * 64
  152. sizes = []
  153. if has_sse4_2:
  154. sizes.append(2)
  155. if has_avx2:
  156. sizes.append(3)
  157. sizes.append(0)
  158. def t(key, data, align_offset=0):
  159. expected = test_xor64(key, data, 1, 0)
  160. for which_function in sizes:
  161. actual = test_xor64(key, data, which_function, align_offset)
  162. self.ae(expected, actual, f'{align_offset=} {len(data)=}')
  163. t(key, b'')
  164. for base in (b'abc', base_data):
  165. for extra in range(len(base_data)):
  166. for align_offset in range(64):
  167. data = base + base_data[:extra]
  168. t(key, data, align_offset)
  169. def test_disk_cache(self):
  170. s = self.create_screen()
  171. dc = s.grman.disk_cache
  172. dc.small_hole_threshold = 0
  173. data = {}
  174. def key_as_bytes(key):
  175. if isinstance(key, int):
  176. key = str(key)
  177. if isinstance(key, str):
  178. key = key.encode('utf-8')
  179. return bytes(key)
  180. def add(key, val):
  181. bkey = key_as_bytes(key)
  182. data[key] = key_as_bytes(val)
  183. dc.add(bkey, data[key])
  184. def remove(key):
  185. bkey = key_as_bytes(key)
  186. data.pop(key, None)
  187. return dc.remove(bkey)
  188. def check_data():
  189. for key, val in data.items():
  190. self.ae(dc.get(key_as_bytes(key)), val)
  191. def reset(small_hole_threshold=0, defrag_factor=2):
  192. nonlocal dc, data, s
  193. s = self.create_screen()
  194. dc = s.grman.disk_cache
  195. dc.small_hole_threshold = small_hole_threshold
  196. dc.defrag_factor = defrag_factor
  197. data = {}
  198. holes_to_create = 2, 4, 6, 8
  199. for i in range(25):
  200. self.assertIsNone(add(i, f'{i}' * i))
  201. if i <= max(holes_to_create):
  202. # We wait here to ensure data is written in order, otherwise the
  203. # holes test below can fail
  204. self.assertTrue(dc.wait_for_write())
  205. self.assertEqual(dc.total_size, sum(map(len, data.values())))
  206. self.assertTrue(dc.wait_for_write())
  207. check_data()
  208. sz = dc.size_on_disk()
  209. self.assertEqual(sz, sum(map(len, data.values())))
  210. self.assertFalse(dc.holes())
  211. holes = set()
  212. for x in holes_to_create:
  213. remove(x)
  214. holes.add(x)
  215. check_data()
  216. self.assertRaises(KeyError, dc.get, key_as_bytes(x))
  217. self.assertEqual(sz, dc.size_on_disk())
  218. self.assertEqual(holes, {x[1] for x in dc.holes()})
  219. self.assertEqual(sz, dc.size_on_disk())
  220. # fill holes largest first to ensure small one doesn't go into large accidentally causing fragmentation
  221. for i, x in enumerate(sorted(holes, reverse=True)):
  222. x = 'ABCDEFGH'[i] * x
  223. add(x, x)
  224. self.assertTrue(dc.wait_for_write())
  225. check_data()
  226. holes.discard(len(x))
  227. self.assertEqual(holes, {x[1] for x in dc.holes()})
  228. self.assertEqual(sz, dc.size_on_disk(), f'Disk cache has unexpectedly grown from {sz} to {dc.size_on_disk} with data: {x!r}')
  229. check_data()
  230. dc.clear()
  231. st = time.monotonic()
  232. while dc.size_on_disk() and time.monotonic() - st < 2:
  233. time.sleep(0.001)
  234. self.assertEqual(dc.size_on_disk(), 0)
  235. data.clear()
  236. for i in range(25):
  237. self.assertIsNone(add(i, f'{i}' * i))
  238. dc.wait_for_write()
  239. check_data()
  240. before = dc.size_on_disk()
  241. while dc.total_size > before // 3:
  242. key = random.choice(tuple(data))
  243. self.assertTrue(remove(key))
  244. check_data()
  245. add('trigger defrag', 'XXX')
  246. dc.wait_for_write()
  247. self.assertLess(dc.size_on_disk(), before)
  248. check_data()
  249. dc.clear()
  250. st = time.monotonic()
  251. while dc.size_on_disk() and time.monotonic() - st < 20:
  252. time.sleep(0.01)
  253. self.assertEqual(dc.size_on_disk(), 0)
  254. for frame in range(32):
  255. add(f'1:{frame}', f'{frame:02d}' * 8)
  256. dc.wait_for_write()
  257. self.assertEqual(dc.size_on_disk(), 32 * 16)
  258. self.assertEqual(dc.num_cached_in_ram(), 0)
  259. num_in_ram = 0
  260. for frame in range(32):
  261. dc.get(key_as_bytes(f'1:{frame}'))
  262. self.assertEqual(dc.num_cached_in_ram(), num_in_ram)
  263. for frame in range(32):
  264. dc.get(key_as_bytes(f'1:{frame}'), True)
  265. num_in_ram += 1
  266. self.assertEqual(dc.num_cached_in_ram(), num_in_ram)
  267. def clear_predicate(key):
  268. return key.startswith(b'1:')
  269. dc.remove_from_ram(clear_predicate)
  270. self.assertEqual(dc.num_cached_in_ram(), 0)
  271. reset(small_hole_threshold=512, defrag_factor=20)
  272. self.assertIsNone(add(1, '1' * 1024))
  273. self.assertIsNone(add(2, '2' * 1024))
  274. dc.wait_for_write()
  275. sz = dc.size_on_disk()
  276. remove(1)
  277. self.ae(sz, dc.size_on_disk())
  278. self.ae({x[1] for x in dc.holes()}, {1024})
  279. self.assertIsNone(add(3, '3' * 800))
  280. dc.wait_for_write()
  281. self.assertFalse(dc.holes())
  282. self.ae(sz, dc.size_on_disk())
  283. self.assertIsNone(add(4, '4' * 100))
  284. sz += 100
  285. dc.wait_for_write()
  286. self.ae(sz, dc.size_on_disk())
  287. check_data()
  288. self.assertFalse(dc.holes())
  289. remove(4)
  290. self.assertFalse(dc.holes())
  291. self.assertIsNone(add(5, '5' * 10))
  292. sz += 10
  293. dc.wait_for_write()
  294. self.ae(sz, dc.size_on_disk())
  295. # test hole coalescing
  296. reset(defrag_factor=20)
  297. for i in range(1, 6):
  298. self.assertIsNone(add(i, str(i)*i))
  299. dc.wait_for_write()
  300. remove(2)
  301. remove(4)
  302. self.assertEqual(dc.holes(), {(1, 2), (6, 4)})
  303. remove(3)
  304. self.assertEqual(dc.holes(), {(1, 9)})
  305. def test_suppressing_gr_command_responses(self):
  306. s, g, pl, sl = load_helpers(self)
  307. self.ae(pl('abcd', s=10, v=10, q=1), 'ENODATA:Insufficient image data: 4 < 400')
  308. self.ae(pl('abcd', s=10, v=10, q=2), None)
  309. self.assertIsNone(pl('abcd', s=1, v=1, a='q', q=1))
  310. # Test chunked load
  311. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1))
  312. self.assertIsNone(pl('efgh', m=1))
  313. self.assertIsNone(pl('ijkl', m=1))
  314. self.assertIsNone(pl('mnop', m=0))
  315. # errors
  316. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=1))
  317. self.ae(pl('mnop', m=0), 'ENODATA:Insufficient image data: 8 < 16')
  318. self.assertIsNone(pl('abcd', s=2, v=2, m=1, q=2))
  319. self.assertIsNone(pl('mnop', m=0))
  320. # frames
  321. s = self.create_screen()
  322. li = make_send_command(s)
  323. self.assertEqual(li().code, 'ENOENT')
  324. self.assertIsNone(li(q=2))
  325. self.assertIsNone(li(a='t', q=1))
  326. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1))
  327. self.assertIsNone(li(payload='2' * 12, m=1))
  328. self.assertIsNone(li(payload='2' * 12))
  329. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=1))
  330. self.ae(li(payload='2' * 12).code, 'ENODATA')
  331. self.assertIsNone(li(payload='2' * 12, z=77, m=1, q=2))
  332. self.assertIsNone(li(payload='2' * 12))
  333. def test_load_images(self):
  334. s, g, pl, sl = load_helpers(self)
  335. self.assertEqual(g.disk_cache.total_size, 0)
  336. # Test load query
  337. self.ae(pl('abcd', s=1, v=1, a='q'), 'OK')
  338. self.ae(g.image_count, 0)
  339. # Test simple load
  340. for f in 32, 24:
  341. p = 'abc' + ('d' if f == 32 else '')
  342. img = sl(p, s=1, v=1, f=f)
  343. self.ae(bool(img['is_4byte_aligned']), f == 32)
  344. # Test chunked load
  345. self.assertIsNone(pl('abcd', s=2, v=2, m=1))
  346. self.assertIsNone(pl('efgh', m=1))
  347. self.assertIsNone(pl('ijkl', m=1))
  348. self.ae(pl('mnop', m=0), 'OK')
  349. img = g.image_for_client_id(1)
  350. self.ae(img['data'], b'abcdefghijklmnop')
  351. # Test interrupted and retried chunked load
  352. self.assertIsNone(pl('abcd', s=2, v=2, m=1))
  353. self.assertIsNone(pl('efgh', m=1))
  354. send_command(s, 'a=d') # delete command should clear partial transfer
  355. self.assertIsNone(pl('abcd', s=2, v=2, m=1))
  356. self.assertIsNone(pl('efgh', m=1))
  357. self.assertIsNone(pl('ijkl', m=1))
  358. self.ae(pl('1234', m=0), 'OK')
  359. img = g.image_for_client_id(1)
  360. self.ae(img['data'], b'abcdefghijkl1234')
  361. random_data = byte_block(32 * 1024)
  362. sl(
  363. random_data,
  364. s=1024,
  365. v=8,
  366. expecting_data=random_data
  367. )
  368. # Test compression
  369. compressed_random_data = zlib.compress(random_data)
  370. sl(
  371. compressed_random_data,
  372. s=1024,
  373. v=8,
  374. o='z',
  375. expecting_data=random_data
  376. )
  377. # Test chunked + compressed
  378. b = len(compressed_random_data) // 2
  379. self.assertIsNone(pl(compressed_random_data[:b], s=1024, v=8, o='z', m=1))
  380. self.ae(pl(compressed_random_data[b:], m=0), 'OK')
  381. img = g.image_for_client_id(1)
  382. self.ae(img['data'], random_data)
  383. # Test loading from file
  384. def load_temp(prefix='tty-graphics-protocol-'):
  385. f = tempfile.NamedTemporaryFile(prefix=prefix)
  386. f.write(random_data), f.flush()
  387. sl(f.name, s=1024, v=8, t='f', expecting_data=random_data)
  388. self.assertTrue(os.path.exists(f.name))
  389. f.seek(0), f.truncate(), f.write(compressed_random_data), f.flush()
  390. sl(f.name, s=1024, v=8, t='t', o='z', expecting_data=random_data)
  391. return f
  392. f = load_temp()
  393. self.assertFalse(os.path.exists(f.name), f'Temp file at {f.name} was not deleted')
  394. with suppress(FileNotFoundError):
  395. f.close()
  396. f = load_temp('')
  397. self.assertTrue(os.path.exists(f.name), f'Temp file at {f.name} was deleted')
  398. f.close()
  399. # Test loading from POSIX SHM
  400. name = '/kitty-test-shm'
  401. shm_write(name, random_data)
  402. sl(name, s=1024, v=8, t='s', expecting_data=random_data)
  403. self.assertRaises(
  404. FileNotFoundError, shm_unlink, name
  405. ) # check that file was deleted
  406. s.reset()
  407. self.assertEqual(g.disk_cache.total_size, 0)
  408. @unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests')
  409. def test_load_png(self):
  410. s, g, pl, sl = load_helpers(self)
  411. w, h = 5, 3
  412. rgba_data = byte_block(w * h * 4)
  413. img = Image.frombytes('RGBA', (w, h), rgba_data)
  414. rgb_data = img.convert('RGB').convert('RGBA').tobytes()
  415. self.assertEqual(g.disk_cache.total_size, 0)
  416. def png(mode='RGBA'):
  417. buf = BytesIO()
  418. i = img
  419. if mode != i.mode:
  420. i = img.convert(mode)
  421. i.save(buf, 'PNG')
  422. return buf.getvalue()
  423. for mode in 'RGBA RGB'.split():
  424. data = png(mode)
  425. sl(data, f=100, expecting_data=rgb_data if mode == 'RGB' else rgba_data)
  426. for m in 'LP':
  427. img = img.convert(m)
  428. rgba_data = img.convert('RGBA').tobytes()
  429. data = png(m)
  430. sl(data, f=100, expecting_data=rgba_data)
  431. self.ae(pl(b'a' * 20, f=100, S=20).partition(':')[0], 'EBADPNG')
  432. s.reset()
  433. self.assertEqual(g.disk_cache.total_size, 0)
  434. def test_load_png_simple(self):
  435. # 1x1 transparent PNG
  436. png_data = base64_decode('iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+P+/HgAFhAJ/wlseKgAAAABJRU5ErkJggg==')
  437. expected = b'\x00\xff\xff\x7f'
  438. self.ae(load_png_data(png_data), (expected, 1, 1))
  439. s, g, pl, sl = load_helpers(self)
  440. sl(png_data, f=100, expecting_data=expected)
  441. # test error handling for loading bad png data
  442. self.assertRaisesRegex(ValueError, '[EBADPNG]', load_png_data, b'dsfsdfsfsfd')
  443. def test_gr_operations_with_numbers(self):
  444. s = self.create_screen()
  445. g = s.grman
  446. self.assertEqual(g.disk_cache.total_size, 0)
  447. def li(payload, **kw):
  448. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  449. res = send_command(s, cmd, payload)
  450. return parse_response_with_ids(res)
  451. code, ids = li('abc', s=1, v=1, f=24, I=1, i=3)
  452. self.ae(code, 'EINVAL')
  453. code, ids = li('abc', s=1, v=1, f=24, I=1)
  454. self.ae((code, ids), ('OK', 'i=1,I=1'))
  455. img = g.image_for_client_number(1)
  456. self.ae(img['client_number'], 1)
  457. self.ae(img['client_id'], 1)
  458. code, ids = li('abc', s=1, v=1, f=24, I=1)
  459. self.ae((code, ids), ('OK', 'i=2,I=1'))
  460. img = g.image_for_client_number(1)
  461. self.ae(img['client_number'], 1)
  462. self.ae(img['client_id'], 2)
  463. code, ids = li('abc', s=1, v=1, f=24, I=1)
  464. self.ae((code, ids), ('OK', 'i=3,I=1'))
  465. code, ids = li('abc', s=1, v=1, f=24, i=5)
  466. self.ae((code, ids), ('OK', 'i=5'))
  467. code, ids = li('abc', s=1, v=1, f=24, I=3)
  468. self.ae((code, ids), ('OK', 'i=4,I=3'))
  469. # Test chunked load with number
  470. self.assertIsNone(li('abcd', s=2, v=2, m=1, I=93))
  471. self.assertIsNone(li('efgh', m=1))
  472. self.assertIsNone(li('ijkx', m=1))
  473. self.ae(li('mnop', m=0), ('OK', 'i=6,I=93'))
  474. img = g.image_for_client_number(93)
  475. self.ae(img['data'], b'abcdefghijkxmnop')
  476. self.ae(img['client_id'], 6)
  477. # test put with number
  478. def put(**kw):
  479. cmd = ','.join(f'{k}={v}' for k, v in kw.items())
  480. cmd = 'a=p,' + cmd
  481. return parse_response_with_ids(send_command(s, cmd))
  482. code, idstr = put(c=2, r=2, I=93)
  483. self.ae((code, idstr), ('OK', 'i=6,I=93'))
  484. code, idstr = put(c=2, r=2, I=94)
  485. self.ae(code, 'ENOENT')
  486. # test delete with number
  487. def delete(ac='N', **kw):
  488. cmd = 'a=d'
  489. if ac:
  490. cmd += f',d={ac}'
  491. if kw:
  492. cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items())
  493. send_command(s, cmd)
  494. count = s.grman.image_count
  495. put(i=1), put(i=2), put(i=3), put(i=4), put(i=5)
  496. delete(I=94)
  497. self.ae(s.grman.image_count, count)
  498. delete(I=93)
  499. self.ae(s.grman.image_count, count - 1)
  500. delete(I=1)
  501. self.ae(s.grman.image_count, count - 2)
  502. cn = 1117
  503. li('abc', s=1, v=1, f=24, I=cn)
  504. first_id = g.image_for_client_number(cn)['internal_id']
  505. li('abc', s=1, v=1, f=24, I=cn)
  506. second_id = g.image_for_client_number(cn)['internal_id']
  507. self.assertNotEqual(first_id, second_id)
  508. count = s.grman.image_count
  509. delete(I=cn)
  510. self.ae(g.image_for_client_number(cn)['internal_id'], first_id)
  511. self.ae(s.grman.image_count, count - 1)
  512. s.reset()
  513. self.assertEqual(g.disk_cache.total_size, 0)
  514. def test_image_put(self):
  515. cw, ch = 10, 20
  516. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  517. self.ae(put_image(s, cw, ch)[1], 'OK')
  518. l0 = layers(s)
  519. self.ae(len(l0), 1)
  520. rect_eq(l0[0]['src_rect'], 0, 0, 1, 1)
  521. rect_eq(l0[0]['dest_rect'], -1, 1, -1 + dx, 1 - dy)
  522. self.ae(l0[0]['group_count'], 1)
  523. self.ae(s.cursor.x, 1), self.ae(s.cursor.y, 0)
  524. src_width, src_height = 3, 5
  525. iid, (code, idstr) = put_ref(s, num_cols=s.columns, num_lines=1, x_off=2, y_off=1, width=src_width, height=src_height,
  526. cell_x_off=3, cell_y_off=1, z=-1, placement_id=17)
  527. self.ae(idstr, f'i={iid},p=17')
  528. l2 = layers(s)
  529. self.ae(len(l2), 2)
  530. self.ae(l2[1], l0[0])
  531. rect_eq(l2[0]['src_rect'], 2 / 10, 1 / 20, (2 + 3) / 10, (1 + 5)/20)
  532. self.ae(l2[0]['group_count'], 2)
  533. left, top = -1 + dx + 3 * dx / cw, 1 - 1 * dy / ch
  534. right = -1 + (1 + s.columns) * dx
  535. bottom = 1 - dy
  536. rect_eq(l2[0]['dest_rect'], left, top, right, bottom)
  537. self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1)
  538. self.ae(put_image(s, 10, 20, cursor_movement=1)[1], 'OK')
  539. self.ae(s.cursor.x, 0), self.ae(s.cursor.y, 1)
  540. s.reset()
  541. self.assertEqual(s.grman.disk_cache.total_size, 0)
  542. self.ae(put_image(s, 2*cw, 2*ch, num_cols=3)[1], 'OK')
  543. self.ae((s.cursor.x, s.cursor.y), (3, 2))
  544. rect_eq(layers(s)[0]['dest_rect'], -1, 1, -1 + 3 * dx, 1 - 3*dy)
  545. def test_image_layer_grouping(self):
  546. cw, ch = 10, 20
  547. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  548. def group_counts():
  549. return tuple(x['group_count'] for x in layers(s))
  550. self.ae(put_image(s, 10, 20, id=1)[1], 'OK')
  551. self.ae(group_counts(), (1,))
  552. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=2)
  553. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=3, z=-2)
  554. put_ref(s, id=1, num_cols=2, num_lines=1, placement_id=4, z=-2)
  555. self.ae(group_counts(), (4, 3, 2, 1))
  556. self.ae(put_image(s, 8, 16, id=2, z=-1)[1], 'OK')
  557. self.ae(group_counts(), (2, 1, 1, 2, 1))
  558. def test_image_parents(self):
  559. cw, ch = 10, 20
  560. iw, ih = 10, 20
  561. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  562. def positions():
  563. ans = {}
  564. def x(x):
  565. return round(((x + 1)/2) * s.columns)
  566. def y(y):
  567. return int(((-y + 1)/2) * s.lines)
  568. for i in layers(s):
  569. d = i['dest_rect']
  570. ans[(i['image_id'], i['ref_id'])] = {'x': x(d['left']), 'y': y(d['top'])}
  571. return ans
  572. def p(x, y=0):
  573. return {'x':x, 'y': y}
  574. self.ae(put_image(s, iw, ih, id=1)[1], 'OK')
  575. self.ae(put_ref(s, id=1, placement_id=1), (1, ('OK', 'i=1,p=1')))
  576. pos = {(1, 1): p(0), (1, 2): p(1)}
  577. self.ae(positions(), pos)
  578. # check that adding a reference to a non-existent parent fails
  579. self.ae(put_ref(s, id=1, placement_id=33, parent_id=1, parent_placement_id=2), (1, ('ENOPARENT', 'i=1,p=33')))
  580. self.ae(put_ref(s, id=1, placement_id=33, parent_id=33), (1, ('ENOPARENT', 'i=1,p=33')))
  581. # check that we cannot add a reference that is its own parent
  582. self.ae(put_ref(s, id=1, placement_id=1, parent_id=1, parent_placement_id=1), (1, ('EINVAL', 'i=1,p=1')))
  583. self.ae(put_image(s, iw, ih, id=2)[1], 'OK')
  584. pos[(2,1)] = p(2)
  585. self.ae(positions(), pos)
  586. # Add two children to the first placement of img2
  587. before = s.cursor.x, s.cursor.y
  588. self.ae(put_ref(s, id=1, placement_id=2, parent_id=2, offset_from_parent_y=3), (1, ('OK', 'i=1,p=2')))
  589. self.ae(before, (s.cursor.x, s.cursor.y), 'Cursor must not move for child image')
  590. pos[(1,3)] = p(2, 3)
  591. self.ae(positions(), pos)
  592. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3')))
  593. pos[(2,2)] = p(2, 4)
  594. self.ae(positions(), pos)
  595. # Add a grand child to the second child of img2
  596. self.ae(put_ref(s, id=2, placement_id=4, parent_id=2, parent_placement_id=3, offset_from_parent_x=-1), (2, ('OK', 'i=2,p=4')))
  597. pos[(2,3)] = p(pos[(2,2)]['x']-1, pos[(2,2)]['y'])
  598. self.ae(positions(), pos)
  599. # Check that creating a cycle is prevented
  600. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, parent_placement_id=4), (2, ('ECYCLE', 'i=2,p=3')))
  601. self.ae(positions(), pos)
  602. # Check that depth is limited
  603. for i in range(5, 12):
  604. q = put_ref(s, id=2, placement_id=i, parent_id=2, parent_placement_id=i-1, offset_from_parent_x=-1)[1][0]
  605. if q == 'ETOODEEP':
  606. break
  607. self.ae(q, 'OK')
  608. else:
  609. self.assertTrue(False, 'Failed to limit reference chain depth')
  610. # Check that deleting a parent removes all descendants
  611. send_command(s, 'a=d,d=i,i=2,p=3')
  612. pos.pop((2,3)), pos.pop((2,2))
  613. self.ae(positions(), pos)
  614. # Check that deleting a parent deletes all descendants and also removes
  615. # images with no remaining placements
  616. self.ae(put_ref(s, id=2, placement_id=3, parent_id=2, offset_from_parent_y=4), (2, ('OK', 'i=2,p=3')))
  617. pos[(2,11)] = p(2, 4)
  618. self.ae(positions(), pos)
  619. self.ae(put_image(s, iw, ih, id=3, placement_id=97, parent_id=2, parent_placement_id=3)[1], 'OK')
  620. pos[(3,1)] = p(2, 4)
  621. self.ae(positions(), pos)
  622. send_command(s, 'a=d,d=i,i=2')
  623. pos.pop((3,1)), pos.pop((2,11)), pos.pop((2,1)), pos.pop((1,3))
  624. self.ae(positions(), pos)
  625. # Check that virtual placements that try to be relative are rejected
  626. self.ae(put_ref(s, id=1, placement_id=11, parent_id=1, unicode_placeholder=1), (1, ('EINVAL', 'i=1,p=11')))
  627. # Check creation of children of a unicode placeholder based image
  628. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  629. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  630. s.update_only_line_graphics_data()
  631. self.assertFalse(positions()) # the reference is virtual
  632. self.ae(put_ref(s, id=42, placement_id=11, parent_id=42, offset_from_parent_y=2, offset_from_parent_x=1), (42, ('OK', 'i=42,p=11')))
  633. self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible
  634. s.apply_sgr("38;5;42")
  635. # These two characters will become one 2x1 ref.
  636. s.cursor.x = s.cursor.y = 1
  637. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  638. s.cursor.x = s.cursor.y = 0
  639. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  640. s.update_only_line_graphics_data()
  641. pos = {(1, 2): p(1, 2), (1, 3): p(0), (1, 4): p(1)}
  642. self.ae(positions(), pos)
  643. s.cursor.x = s.cursor.y = 0
  644. s.erase_in_display(0, False)
  645. s.update_only_line_graphics_data()
  646. self.assertFalse(positions()) # the reference is virtual without any cell images so the child is invisible
  647. s.cursor.x = s.cursor.y = 2
  648. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  649. s.update_only_line_graphics_data()
  650. self.ae(positions(), {(1, 5): {'x': 2, 'y': 2}, (1, 2): {'x': 3, 'y': 4}})
  651. def test_unicode_placeholders(self):
  652. # This test tests basic image placement using using unicode placeholders
  653. cw, ch = 10, 20
  654. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  655. # Upload two images.
  656. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  657. put_image(s, 10, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=(42<<16) + (43<<8) + 44)
  658. # The references are virtual, so no visible refs yet.
  659. s.update_only_line_graphics_data()
  660. refs = layers(s)
  661. self.ae(len(refs), 0)
  662. # A reminder of row/column diacritics meaning (assuming 0-based):
  663. # \u0305 -> 0
  664. # \u030D -> 1
  665. # \u030E -> 2
  666. # \u0310 -> 3
  667. # Now print the placeholders for the first image.
  668. # Encode the id as an 8-bit color.
  669. s.apply_sgr("38;5;42")
  670. # These two characters will become one 2x1 ref.
  671. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030D")
  672. # These two characters will be two separate refs (not contiguous).
  673. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\u030E")
  674. s.cursor_back(4)
  675. s.update_only_line_graphics_data()
  676. refs = layers(s)
  677. self.ae(len(refs), 3)
  678. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  679. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5})
  680. self.ae(refs[2]['src_rect'], {'left': 0.5, 'top': 0.0, 'right': 0.75, 'bottom': 0.5})
  681. # Erase the line.
  682. s.erase_in_line(2)
  683. # There must be 0 refs after the line is erased.
  684. s.update_only_line_graphics_data()
  685. refs = layers(s)
  686. self.ae(len(refs), 0)
  687. # Now test encoding IDs with the 24-bit color.
  688. # The first image, 1x1
  689. s.apply_sgr("38;2;0;0;42")
  690. s.draw("\U0010EEEE\u0305\u0305")
  691. # The second image, 2x1
  692. s.apply_sgr("38;2;42;43;44")
  693. s.draw("\U0010EEEE\u0305\u030D\U0010EEEE\u0305\u030E")
  694. s.cursor_back(2)
  695. s.update_only_line_graphics_data()
  696. refs = layers(s)
  697. self.ae(len(refs), 2)
  698. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.25, 'bottom': 0.5})
  699. # The second ref spans the whole widths of the second image because it's
  700. # fit to height and centered in a 4x2 box (specified in put_image).
  701. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  702. # Erase the line.
  703. s.erase_in_line(2)
  704. # Now test implicit column numbers.
  705. # We will mix implicit and explicit column/row specifications, but they
  706. # will be combine into just two references.
  707. s.apply_sgr("38;5;42")
  708. # full row 0 of the first image
  709. s.draw("\U0010EEEE\u0305\u0305\U0010EEEE\u0305\U0010EEEE\U0010EEEE\u0305")
  710. # full row 1 of the first image
  711. s.draw("\U0010EEEE\u030D\U0010EEEE\U0010EEEE\U0010EEEE\u030D\u0310")
  712. s.cursor_back(8)
  713. s.update_only_line_graphics_data()
  714. refs = layers(s)
  715. self.ae(len(refs), 2)
  716. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  717. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0})
  718. # Now reset the screen, the images should be erased.
  719. s.reset()
  720. refs = layers(s)
  721. self.ae(len(refs), 0)
  722. def test_unicode_placeholders_3rd_combining_char(self):
  723. # This test tests that we can use the 3rd diacritic for the most
  724. # significant byte
  725. cw, ch = 10, 20
  726. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  727. # Upload two images.
  728. put_image(s, 20, 20, num_cols=4, num_lines=2, unicode_placeholder=1, id=42)
  729. put_image(s, 20, 10, num_cols=4, num_lines=1, unicode_placeholder=1, id=(42 << 24) + 43)
  730. # This one will have id=43, which does not exist.
  731. s.apply_sgr("38;2;0;0;43")
  732. s.draw("\U0010EEEE\u0305\U0010EEEE\U0010EEEE\U0010EEEE")
  733. s.cursor_back(4)
  734. s.update_only_line_graphics_data()
  735. refs = layers(s)
  736. self.ae(len(refs), 0)
  737. s.erase_in_line(2)
  738. # This one will have id=42. We explicitly specify that the most
  739. # significant byte is 0 (third \u305). Specifying the zero byte like
  740. # this is not necessary but is correct.
  741. s.apply_sgr("38;2;0;0;42")
  742. s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE\u0305\u030D\u0305")
  743. # This is the second image.
  744. # \u059C -> 42
  745. s.apply_sgr("38;2;0;0;43")
  746. s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\u0305\u030D\u059C")
  747. # Check that we can continue by using implicit row/column specification.
  748. s.draw("\U0010EEEE\u0305\U0010EEEE")
  749. s.cursor_back(6)
  750. s.update_only_line_graphics_data()
  751. refs = layers(s)
  752. self.ae(len(refs), 2)
  753. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  754. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  755. s.erase_in_line(2)
  756. # Now test the 8-bit color mode. Using the third diacritic, we can
  757. # specify 16 bits: the most significant byte and the least significant
  758. # byte.
  759. s.apply_sgr("38;5;42")
  760. s.draw("\U0010EEEE\u0305\u0305\u0305\U0010EEEE")
  761. s.apply_sgr("38;5;43")
  762. s.draw("\U0010EEEE\u0305\u0305\u059C\U0010EEEE\U0010EEEE\u0305\U0010EEEE")
  763. s.cursor_back(6)
  764. s.update_only_line_graphics_data()
  765. refs = layers(s)
  766. self.ae(len(refs), 2)
  767. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 0.5, 'bottom': 0.5})
  768. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  769. def test_unicode_placeholders_multiple_placements(self):
  770. # Here we test placement specification via underline color.
  771. cw, ch = 10, 20
  772. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  773. put_image(s, 20, 20, num_cols=1, num_lines=1, placement_id=1, unicode_placeholder=1, id=42)
  774. put_ref(s, id=42, num_cols=2, num_lines=1, placement_id=22, unicode_placeholder=1)
  775. put_ref(s, id=42, num_cols=4, num_lines=2, placement_id=44, unicode_placeholder=1)
  776. # The references are virtual, so no visible refs yet.
  777. s.update_only_line_graphics_data()
  778. refs = layers(s)
  779. self.ae(len(refs), 0)
  780. # Draw the first row of each placement.
  781. s.apply_sgr("38;5;42")
  782. s.apply_sgr("58;5;1")
  783. s.draw("\U0010EEEE\u0305")
  784. s.apply_sgr("58;5;22")
  785. s.draw("\U0010EEEE\u0305\U0010EEEE\u0305")
  786. s.apply_sgr("58;5;44")
  787. s.draw("\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305\U0010EEEE\u0305")
  788. s.update_only_line_graphics_data()
  789. refs = layers(s)
  790. self.ae(len(refs), 3)
  791. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.5})
  792. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  793. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  794. def test_unicode_placeholders_scroll(self):
  795. # Here we test scrolling of a region. We'll draw an image spanning 8
  796. # rows and then scroll only the middle part of this image. Each
  797. # reference corresponds to one row.
  798. cw, ch = 5, 10
  799. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch, lines=8)
  800. put_image(s, 5, 80, num_cols=1, num_lines=8, unicode_placeholder=1, id=42)
  801. s.apply_sgr("38;5;42")
  802. s.cursor_position(1, 0)
  803. s.draw("\U0010EEEE\u0305\n")
  804. s.cursor_position(2, 0)
  805. s.draw("\U0010EEEE\u030D\n")
  806. s.cursor_position(3, 0)
  807. s.draw("\U0010EEEE\u030E\n")
  808. s.cursor_position(4, 0)
  809. s.draw("\U0010EEEE\u0310\n")
  810. s.cursor_position(5, 0)
  811. s.draw("\U0010EEEE\u0312\n")
  812. s.cursor_position(6, 0)
  813. s.draw("\U0010EEEE\u033D\n")
  814. s.cursor_position(7, 0)
  815. s.draw("\U0010EEEE\u033E\n")
  816. s.cursor_position(8, 0)
  817. s.draw("\U0010EEEE\u033F")
  818. # Each line will contain a part of the image.
  819. s.update_only_line_graphics_data()
  820. refs = layers(s)
  821. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  822. self.ae(len(refs), 8)
  823. for i in range(8):
  824. self.ae(refs[i]['src_rect'], {'left': 0.0, 'top': 0.125*i, 'right': 1.0, 'bottom': 0.125*(i + 1)})
  825. self.ae(refs[i]['dest_rect']['top'], 1 - 0.25*i)
  826. # Now set margins to lines 3 and 6.
  827. s.set_margins(3, 6) # 1-based indexing
  828. # Scroll two lines down (i.e. move lines 3..6 up).
  829. # Lines 3 and 4 will be erased.
  830. s.cursor_position(6, 0)
  831. s.index()
  832. s.index()
  833. s.update_only_line_graphics_data()
  834. refs = layers(s)
  835. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  836. self.ae(len(refs), 6)
  837. # Lines 1 and 2 are outside of the region, not scrolled.
  838. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125})
  839. self.ae(refs[0]['dest_rect']['top'], 1.0)
  840. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2})
  841. self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1)
  842. # Lines 3 and 4 are erased.
  843. # Lines 5 and 6 are now higher.
  844. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5})
  845. self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*2)
  846. self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*5, 'right': 1.0, 'bottom': 0.125*6})
  847. self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*3)
  848. # Lines 7 and 8 are outside of the region.
  849. self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7})
  850. self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*6)
  851. self.ae(refs[5]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8})
  852. self.ae(refs[5]['dest_rect']['top'], 1.0 - 0.25*7)
  853. # Now scroll three lines up (i.e. move lines 5..6 down).
  854. # Line 6 will be erased.
  855. s.cursor_position(3, 0)
  856. s.reverse_index()
  857. s.reverse_index()
  858. s.reverse_index()
  859. s.update_only_line_graphics_data()
  860. refs = layers(s)
  861. refs = sorted(refs, key=lambda r: r['src_rect']['top'])
  862. self.ae(len(refs), 5)
  863. # Lines 1 and 2 are outside of the region, not scrolled.
  864. self.ae(refs[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.125})
  865. self.ae(refs[0]['dest_rect']['top'], 1.0)
  866. self.ae(refs[1]['src_rect'], {'left': 0.0, 'top': 0.125*1, 'right': 1.0, 'bottom': 0.125*2})
  867. self.ae(refs[1]['dest_rect']['top'], 1.0 - 0.25*1)
  868. # Lines 3, 4 and 6 are erased.
  869. # Line 5 is now lower.
  870. self.ae(refs[2]['src_rect'], {'left': 0.0, 'top': 0.125*4, 'right': 1.0, 'bottom': 0.125*5})
  871. self.ae(refs[2]['dest_rect']['top'], 1.0 - 0.25*5)
  872. # Lines 7 and 8 are outside of the region.
  873. self.ae(refs[3]['src_rect'], {'left': 0.0, 'top': 0.125*6, 'right': 1.0, 'bottom': 0.125*7})
  874. self.ae(refs[3]['dest_rect']['top'], 1.0 - 0.25*6)
  875. self.ae(refs[4]['src_rect'], {'left': 0.0, 'top': 0.125*7, 'right': 1.0, 'bottom': 0.125*8})
  876. self.ae(refs[4]['dest_rect']['top'], 1.0 - 0.25*7)
  877. def test_gr_scroll(self):
  878. cw, ch = 10, 20
  879. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  880. put_image(s, 10, 20, no_id=True) # a one cell image at (0, 0)
  881. self.ae(len(layers(s)), 1)
  882. for i in range(s.lines):
  883. s.index()
  884. self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1)
  885. for i in range(s.historybuf.ynum - 1):
  886. s.index()
  887. self.ae(len(layers(s)), 0), self.ae(s.grman.image_count, 1)
  888. s.index()
  889. self.ae(s.grman.image_count, 0)
  890. # Now test with margins
  891. s.reset()
  892. # Test images outside page area untouched
  893. put_image(s, cw, ch) # a one cell image at (0, 0)
  894. for i in range(s.lines - 1):
  895. s.index()
  896. put_image(s, cw, ch) # a one cell image at (0, bottom)
  897. s.set_margins(2, 4) # 1-based indexing
  898. self.ae(s.grman.image_count, 2)
  899. for i in range(s.lines + s.historybuf.ynum):
  900. s.index()
  901. self.ae(s.grman.image_count, 2)
  902. for i in range(s.lines): # ensure cursor is at top margin
  903. s.reverse_index()
  904. # Test clipped scrolling during index
  905. put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image
  906. self.ae(s.grman.image_count, 3)
  907. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  908. s.index(), s.index()
  909. l0 = layers(s)
  910. self.ae(len(l0), 3)
  911. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.5, 'right': 1.0, 'bottom': 1.0})
  912. s.index()
  913. self.ae(s.grman.image_count, 2)
  914. # Test clipped scrolling during reverse_index
  915. for i in range(s.lines):
  916. s.reverse_index()
  917. put_image(s, cw, 2*ch, z=-1, no_id=True) # 1x2 cell image
  918. self.ae(s.grman.image_count, 3)
  919. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 1.0})
  920. while s.cursor.y != 1:
  921. s.reverse_index()
  922. s.reverse_index(), s.reverse_index()
  923. self.ae(layers(s)[0]['src_rect'], {'left': 0.0, 'top': 0.0, 'right': 1.0, 'bottom': 0.5})
  924. s.reverse_index()
  925. self.ae(s.grman.image_count, 2)
  926. s.reset()
  927. self.assertEqual(s.grman.disk_cache.total_size, 0)
  928. def test_gr_reset(self):
  929. cw, ch = 10, 20
  930. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  931. put_image(s, cw, ch) # a one cell image at (0, 0)
  932. self.ae(len(layers(s)), 1)
  933. s.reset()
  934. self.ae(s.grman.image_count, 0)
  935. put_image(s, cw, ch) # a one cell image at (0, 0)
  936. self.ae(s.grman.image_count, 1)
  937. for i in range(s.lines):
  938. s.index()
  939. s.reset()
  940. self.ae(s.grman.image_count, 1)
  941. def test_gr_delete(self):
  942. cw, ch = 10, 20
  943. s, dx, dy, put_image, put_ref, layers, rect_eq = put_helpers(self, cw, ch)
  944. def delete(ac=None, **kw):
  945. cmd = 'a=d'
  946. if ac:
  947. cmd += f',d={ac}'
  948. if kw:
  949. cmd += ',' + ','.join(f'{k}={v}' for k, v in kw.items())
  950. send_command(s, cmd)
  951. iid = put_image(s, cw, ch, a='t')[0]
  952. self.ae(s.grman.image_count, 1)
  953. delete('I', i=iid)
  954. self.ae(s.grman.image_count, 0)
  955. iid1 = put_image(s, cw, ch, a='t')[0]
  956. iid2 = put_image(s, cw, ch, a='t')[0]
  957. self.ae(s.grman.image_count, 2)
  958. delete('R', x=iid1, y=iid2)
  959. self.ae(s.grman.image_count, 0)
  960. put_image(s, cw, ch)
  961. delete()
  962. self.ae(s.grman.image_count, 1)
  963. self.ae(len(layers(s)), 0)
  964. delete('A')
  965. self.ae(s.grman.image_count, 1)
  966. s.reset()
  967. self.ae(s.grman.image_count, 0)
  968. put_image(s, cw, ch)
  969. self.ae(s.grman.image_count, 1)
  970. delete('A')
  971. self.ae(s.grman.image_count, 0)
  972. self.assertEqual(s.grman.disk_cache.total_size, 0)
  973. iid = put_image(s, cw, ch)[0]
  974. delete('I', i=iid, p=7)
  975. self.ae(s.grman.image_count, 1)
  976. delete('I', i=iid)
  977. self.ae(s.grman.image_count, 0)
  978. self.assertEqual(s.grman.disk_cache.total_size, 0)
  979. iid = put_image(s, cw, ch, placement_id=9)[0]
  980. delete('I', i=iid, p=9)
  981. self.ae(s.grman.image_count, 0)
  982. self.assertEqual(s.grman.disk_cache.total_size, 0)
  983. s.reset()
  984. put_image(s, cw, ch)
  985. put_image(s, cw, ch)
  986. delete('C')
  987. self.ae(s.grman.image_count, 2)
  988. s.cursor_position(1, 1)
  989. delete('C')
  990. self.ae(s.grman.image_count, 1)
  991. delete('P', x=2, y=1)
  992. self.ae(s.grman.image_count, 0)
  993. self.assertEqual(s.grman.disk_cache.total_size, 0)
  994. put_image(s, cw, ch, z=9)
  995. delete('Z', z=9)
  996. self.ae(s.grman.image_count, 0)
  997. put_image(s, cw, ch, id=1)
  998. put_image(s, cw, ch, id=2)
  999. put_image(s, cw, ch, id=3)
  1000. delete('R', y=2)
  1001. self.ae(s.grman.image_count, 1)
  1002. delete('R', x=3, y=3)
  1003. self.ae(s.grman.image_count, 0)
  1004. self.assertEqual(s.grman.disk_cache.total_size, 0)
  1005. # test put + delete + put
  1006. iid = 999999
  1007. self.ae(put_image(s, cw, ch, id=iid), (iid, 'OK'))
  1008. self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}')))
  1009. delete('i', i=iid)
  1010. self.ae(s.grman.image_count, 1)
  1011. self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}')))
  1012. delete('I', i=iid)
  1013. self.ae(put_ref(s, id=iid), (iid, ('ENOENT', f'i={iid}')))
  1014. self.ae(s.grman.image_count, 0)
  1015. self.assertEqual(s.grman.disk_cache.total_size, 0)
  1016. # test delete but not free
  1017. s.reset()
  1018. iid = 9999999
  1019. self.ae(put_image(s, cw, ch, id=iid), (iid, 'OK'))
  1020. self.ae(put_ref(s, id=iid), (iid, ('OK', f'i={iid}')))
  1021. self.ae(put_image(s, cw, ch, id=iid+1), (iid+1, 'OK'))
  1022. self.ae(put_ref(s, id=iid+1), (iid+1, ('OK', f'i={iid+1}')))
  1023. delete('i', i=iid)
  1024. self.ae(s.grman.image_count, 2)
  1025. delete('I', i=iid+1)
  1026. self.ae(s.grman.image_count, 1)
  1027. def test_animation_frame_loading(self):
  1028. s = self.create_screen()
  1029. g = s.grman
  1030. li = make_send_command(s)
  1031. def t(code='OK', image_id=1, frame_number=2, **kw):
  1032. res = li(**kw)
  1033. if code is not None:
  1034. self.assertEqual(code, res.code, f'{code} != {res.code}: {res.msg}')
  1035. if image_id is not None:
  1036. self.assertEqual(image_id, res.image_id)
  1037. if frame_number is not None:
  1038. self.assertEqual(frame_number, res.frame_number)
  1039. # test error on send frame for non-existent image
  1040. self.assertEqual(li().code, 'ENOENT')
  1041. # create image
  1042. self.assertEqual(li(a='t').code, 'OK')
  1043. self.assertEqual(g.disk_cache.total_size, 36)
  1044. # simple new frame (width=4, height=3)
  1045. self.assertIsNone(li(payload='2' * 12, z=77, m=1))
  1046. self.assertIsNone(li(payload='2' * 12, z=77, m=1))
  1047. t(payload='2' * 12, z=77)
  1048. img = g.image_for_client_id(1)
  1049. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'2' * 36},))
  1050. # test editing a frame
  1051. t(payload='3' * 36, r=2)
  1052. img = g.image_for_client_id(1)
  1053. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},))
  1054. # test editing part of a frame
  1055. t(payload='4' * 12, r=2, s=2, v=2)
  1056. img = g.image_for_client_id(1)
  1057. def expand(*rows):
  1058. ans = []
  1059. for r in rows:
  1060. ans.append(''.join(x * 3 for x in str(r)))
  1061. return ''.join(ans).encode('ascii')
  1062. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4433, 3333)},))
  1063. t(payload='5' * 12, r=2, s=2, v=2, x=1, y=1)
  1064. img = g.image_for_client_id(1)
  1065. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': expand(4433, 4553, 3553)},))
  1066. t(payload='3' * 36, r=2)
  1067. img = g.image_for_client_id(1)
  1068. self.assertEqual(img['extra_frames'], ({'gap': 77, 'id': 2, 'data': b'3' * 36},))
  1069. # test loading from previous frame
  1070. t(payload='4' * 12, c=2, s=2, v=2, z=101, frame_number=3)
  1071. img = g.image_for_client_id(1)
  1072. self.assertEqual(img['extra_frames'], (
  1073. {'gap': 77, 'id': 2, 'data': b'3' * 36},
  1074. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1075. ))
  1076. # test changing gaps
  1077. img = g.image_for_client_id(1)
  1078. self.assertEqual(img['root_frame_gap'], 0)
  1079. self.assertIsNone(li(a='a', i=1, r=1, z=13))
  1080. img = g.image_for_client_id(1)
  1081. self.assertEqual(img['root_frame_gap'], 13)
  1082. self.assertIsNone(li(a='a', i=1, r=2, z=43))
  1083. img = g.image_for_client_id(1)
  1084. self.assertEqual(img['extra_frames'][0]['gap'], 43)
  1085. # test changing current frame
  1086. img = g.image_for_client_id(1)
  1087. self.assertEqual(img['current_frame_index'], 0)
  1088. self.assertIsNone(li(a='a', i=1, c=2))
  1089. img = g.image_for_client_id(1)
  1090. self.assertEqual(img['current_frame_index'], 1)
  1091. # test delete of frames
  1092. t(payload='5' * 36, frame_number=4)
  1093. img = g.image_for_client_id(1)
  1094. self.assertEqual(img['extra_frames'], (
  1095. {'gap': 43, 'id': 2, 'data': b'3' * 36},
  1096. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1097. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1098. ))
  1099. self.assertEqual(img['current_frame_index'], 1)
  1100. self.assertIsNone(li(a='d', d='f', i=1, r=1))
  1101. img = g.image_for_client_id(1)
  1102. self.assertEqual(img['current_frame_index'], 0)
  1103. self.assertEqual(img['data'], b'3' * 36)
  1104. self.assertEqual(img['extra_frames'], (
  1105. {'gap': 101, 'id': 3, 'data': b'444444333333444444333333333333333333'},
  1106. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1107. ))
  1108. self.assertIsNone(li(a='a', i=1, c=3))
  1109. img = g.image_for_client_id(1)
  1110. self.assertEqual(img['current_frame_index'], 2)
  1111. self.assertIsNone(li(a='d', d='f', i=1, r=2))
  1112. img = g.image_for_client_id(1)
  1113. self.assertEqual(img['current_frame_index'], 1)
  1114. self.assertEqual(img['data'], b'3' * 36)
  1115. self.assertEqual(img['extra_frames'], (
  1116. {'gap': 40, 'id': 4, 'data': b'5' * 36},
  1117. ))
  1118. self.assertIsNone(li(a='d', d='f', i=1))
  1119. img = g.image_for_client_id(1)
  1120. self.assertEqual(img['current_frame_index'], 0)
  1121. self.assertEqual(img['data'], b'5' * 36)
  1122. self.assertFalse(img['extra_frames'])
  1123. self.assertIsNone(li(a='d', d='f', i=1))
  1124. img = g.image_for_client_id(1)
  1125. self.assertEqual(img['data'], b'5' * 36)
  1126. self.ae(g.image_count, 1)
  1127. self.assertIsNone(li(a='d', d='F', i=1))
  1128. self.ae(g.image_count, 0)
  1129. self.assertEqual(g.disk_cache.total_size, 0)
  1130. # test frame composition
  1131. self.assertEqual(li(a='t').code, 'OK')
  1132. self.assertEqual(g.disk_cache.total_size, 36)
  1133. t(payload='2' * 36)
  1134. t(payload='3' * 36, frame_number=3)
  1135. img = g.image_for_client_id(1)
  1136. self.assertEqual(img['extra_frames'], (
  1137. {'gap': 40, 'id': 2, 'data': b'2' * 36},
  1138. {'gap': 40, 'id': 3, 'data': b'3' * 36},
  1139. ))
  1140. self.assertEqual(li(a='c', i=11).code, 'ENOENT')
  1141. self.assertEqual(li(a='c', i=1, r=1, c=2).code, 'OK')
  1142. img = g.image_for_client_id(1)
  1143. self.assertEqual(img['extra_frames'], (
  1144. {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3},
  1145. {'gap': 40, 'id': 3, 'data': b'3' * 36},
  1146. ))
  1147. self.assertEqual(li(a='c', i=1, r=2, c=3, w=1, h=2, x=1, y=1).code, 'OK')
  1148. img = g.image_for_client_id(1)
  1149. self.assertEqual(img['extra_frames'], (
  1150. {'gap': 40, 'id': 2, 'data': b'abcdefghijkl'*3},
  1151. {'gap': 40, 'id': 3, 'data': b'3' * 12 + (b'333abc' + b'3' * 6) * 2},
  1152. ))
  1153. def test_graphics_quota_enforcement(self):
  1154. s = self.create_screen()
  1155. g = s.grman
  1156. g.storage_limit = 36*2
  1157. li = make_send_command(s)
  1158. # test quota for simple images
  1159. self.assertEqual(li(a='T').code, 'OK')
  1160. self.assertEqual(li(a='T', i=2).code, 'OK')
  1161. self.assertEqual(g.disk_cache.total_size, g.storage_limit)
  1162. self.assertEqual(g.image_count, 2)
  1163. self.assertEqual(li(a='T', i=3).code, 'OK')
  1164. self.assertEqual(g.disk_cache.total_size, g.storage_limit)
  1165. self.assertEqual(g.image_count, 2)
  1166. # test quota for frames
  1167. for i in range(8):
  1168. self.assertEqual(li(payload=f'{i}' * 36, i=2).code, 'OK')
  1169. self.assertEqual(li(payload='x' * 36, i=2).code, 'ENOSPC')
  1170. # test editing should not trigger quota
  1171. self.assertEqual(li(payload='4' * 12, r=2, s=2, v=2, i=2).code, 'OK')
  1172. s.reset()
  1173. self.ae(g.image_count, 0)
  1174. self.assertEqual(g.disk_cache.total_size, 0)
  1175. @unittest.skipIf(Image is None, 'PIL not available, skipping PNG tests')
  1176. def test_cached_rgba_conversion(self):
  1177. from kitty.render_cache import ImageRenderCacheForTesting
  1178. w, h = 5, 3
  1179. rgba_data = byte_block(w * h * 4)
  1180. img = Image.frombytes('RGBA', (w, h), rgba_data)
  1181. buf = BytesIO()
  1182. img.save(buf, 'PNG')
  1183. png_data = buf.getvalue()
  1184. with tempfile.TemporaryDirectory() as cache_path:
  1185. irc = ImageRenderCacheForTesting(cache_path)
  1186. srcs, outputs = [], []
  1187. for i in range(2 * irc.max_entries):
  1188. with open(os.path.join(cache_path, f'{i}.png'), 'wb') as f:
  1189. f.write(png_data)
  1190. srcs.append(f.name)
  1191. outputs.append(irc.render(f.name))
  1192. entries = list(irc.entries())
  1193. self.assertLessEqual(len(entries), irc.max_entries)
  1194. self.ae(irc.num_of_renders, len(outputs))
  1195. remaining_outputs = outputs[-irc.max_entries:]
  1196. for x in remaining_outputs:
  1197. self.assertTrue(os.path.exists(x))
  1198. for x in outputs[:-irc.max_entries]:
  1199. self.assertFalse(os.path.exists(x))
  1200. self.assertLess(os.path.getmtime(remaining_outputs[0]), os.path.getmtime(remaining_outputs[1]))
  1201. remaining_srcs = srcs[-irc.max_entries:]
  1202. self.ae(irc.render(remaining_srcs[0]), remaining_outputs[0])
  1203. self.ae(irc.num_of_renders, len(outputs))
  1204. self.assertGreater(os.path.getmtime(remaining_outputs[0]), os.path.getmtime(remaining_outputs[1]))
  1205. width, height, fd = irc(remaining_srcs[-1])
  1206. with open(fd, 'rb') as f:
  1207. self.ae((width, height), (w, h))
  1208. f.seek(8)
  1209. self.ae(rgba_data, f.read())