file_transmission.py 20 KB


  1. #!/usr/bin/env python
  2. # License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. import shutil
  5. import stat
  6. import tempfile
  7. from collections import namedtuple
  8. from contextlib import contextmanager
  9. from pathlib import Path
  10. from kittens.transfer.rsync import Differ, Hasher, Patcher, parse_ftc
  11. from kittens.transfer.utils import set_paths
  12. from kitty.constants import kitten_exe
  13. from kitty.file_transmission import Action, Compression, FileTransmissionCommand, FileType, TransmissionType, ZlibDecompressor
  14. from kitty.file_transmission import TestFileTransmission as FileTransmission
  15. from . import PTY, BaseTest
  16. def response(id='test', msg='', file_id='', name='', action='status', status='', size=-1):
  17. ans = {'action': 'status'}
  18. if id:
  19. ans['id'] = id
  20. if file_id:
  21. ans['file_id'] = file_id
  22. if name:
  23. ans['name'] = name
  24. if status:
  25. ans['status'] = status
  26. if size > -1:
  27. ans['size'] = size
  28. return ans
  29. def names_in(path):
  30. for dirpath, dirnames, filenames in os.walk(path):
  31. for d in dirnames + filenames:
  32. yield os.path.relpath(os.path.join(dirpath, d), path)
  33. def serialized_cmd(**fields) -> str:
  34. if 'id' not in fields:
  35. fields['id'] = 'test'
  36. for k, A in (('action', Action), ('ftype', FileType), ('ttype', TransmissionType), ('compression', Compression)):
  37. if k in fields:
  38. fields[k] = A[fields[k]]
  39. if isinstance(fields.get('data'), str):
  40. fields['data'] = fields['data'].encode('utf-8')
  41. ans = FileTransmissionCommand(**fields)
  42. return ans.serialize()
  43. def generate_data(block_size, num_blocks, *extra) -> bytes:
  44. extra = ''.join(extra)
  45. b = b'_' * (block_size * num_blocks) + extra.encode()
  46. ans = bytearray(b)
  47. for i in range(num_blocks):
  48. offset = i * block_size
  49. p = str(i).encode()
  50. ans[offset:offset+len(p)] = p
  51. return bytes(ans)
  52. def patch_data(data, *patches):
  53. total_patch_size = 0
  54. ans = bytearray(data)
  55. for patch in patches:
  56. o, sep, r = patch.partition(':')
  57. r = r.encode()
  58. total_patch_size += len(r)
  59. offset = int(o)
  60. ans[offset:offset+len(r)] = r
  61. return bytes(ans), len(patches), total_patch_size
  62. def run_roundtrip_test(self: 'TestFileTransmission', src_data, changed, num_of_patches, total_patch_size):
  63. buf = memoryview(bytearray(30))
  64. signature = bytearray(0)
  65. p = Patcher(len(changed))
  66. n = p.signature_header(buf)
  67. signature.extend(buf[:n])
  68. src = memoryview(changed)
  69. bs = p.block_size
  70. while src:
  71. n = p.sign_block(src[:bs], buf)
  72. signature.extend(buf[:n])
  73. src = src[bs:]
  74. d = Differ()
  75. src = memoryview(signature)
  76. while src:
  77. d.add_signature_data(src[:13])
  78. src = src[13:]
  79. d.finish_signature_data()
  80. del src, signature
  81. src = memoryview(src_data)
  82. delta = bytearray(0)
  83. def read_into(b):
  84. nonlocal src
  85. n = min(len(b), len(src))
  86. if n > 0:
  87. b[:n] = src[:n]
  88. src = src[n:]
  89. return n
  90. def write_delta(b):
  91. delta.extend(b)
  92. while d.next_op(read_into, write_delta):
  93. pass
  94. delta = memoryview(delta)
  95. del src
  96. def read_at(pos, output) -> int:
  97. b = changed[pos:]
  98. amt = min(len(output), len(b))
  99. output[:amt] = b[:amt]
  100. return amt
  101. output = bytearray(0)
  102. def write_changes(b):
  103. output.extend(b)
  104. def debug_msg():
  105. return f'\n\nsrc:\n{src_data.decode()}\nchanged:\n{changed.decode()}\noutput:\n{output.decode()}'
  106. try:
  107. while delta:
  108. p.apply_delta_data(delta[:11], read_at, write_changes)
  109. delta = delta[11:]
  110. p.finish_delta_data()
  111. except Exception as err:
  112. self.fail(f'{err}\n{debug_msg()}')
  113. self.assertEqual(src_data, bytes(output), debug_msg())
  114. limit = 2 * (p.block_size * num_of_patches)
  115. if limit > -1:
  116. self.assertLessEqual(
  117. p.total_data_in_delta, limit, f'Unexpectedly poor delta performance: {total_patch_size=} {p.total_data_in_delta=} {limit=}')
  118. def test_rsync_roundtrip(self: 'TestFileTransmission') -> None:
  119. block_size = 16
  120. src_data = generate_data(block_size, 16)
  121. changed, num_of_patches, total_patch_size = patch_data(src_data, "3:patch1", "16:patch2", "130:ptch3", "176:patch4", "222:XXYY")
  122. run_roundtrip_test(self, src_data, src_data[block_size:], 1, block_size)
  123. run_roundtrip_test(self, src_data, changed, num_of_patches, total_patch_size)
  124. run_roundtrip_test(self, src_data, b'', -1, 0)
  125. run_roundtrip_test(self, src_data, src_data, 0, 0)
  126. run_roundtrip_test(self, src_data, changed[:len(changed)-3], num_of_patches, total_patch_size)
  127. run_roundtrip_test(self, src_data, changed[:37] + changed[81:], num_of_patches, total_patch_size)
  128. block_size = 13
  129. src_data = generate_data(block_size, 17, "trailer")
  130. changed, num_of_patches, total_patch_size = patch_data(src_data, "0:patch1", "19:patch2")
  131. run_roundtrip_test(self, src_data, changed, num_of_patches, total_patch_size)
  132. run_roundtrip_test(self, src_data, changed[:len(changed)-3], num_of_patches, total_patch_size)
  133. run_roundtrip_test(self, src_data, changed + b"xyz...", num_of_patches, total_patch_size)
  134. class PtyFileTransmission(FileTransmission):
  135. def __init__(self, pty, allow=True):
  136. self.pty = pty
  137. super().__init__(allow=allow)
  138. self.pty.callbacks.ftc = self
  139. def write_ftc_to_child(self, payload: FileTransmissionCommand, appendleft: bool = False, use_pending: bool = True) -> bool:
  140. # print('to kitten:', payload)
  141. self.pty.write_to_child('\x1b]' + payload.serialize(prefix_with_osc_code=True) + '\x1b\\', flush=False)
  142. return True
  143. class TransferPTY(PTY):
  144. def __init__(self, cmd, cwd, allow=True, env=None):
  145. super().__init__(cmd, cwd=cwd, env=env, rows=200, columns=120)
  146. self.fc = PtyFileTransmission(self, allow=allow)
  147. class TestFileTransmission(BaseTest):
  148. def setUp(self):
  149. self.direction_receive = False
  150. self.kitty_home = self.kitty_cwd = self.kitten_home = self.kitten_cwd = ''
  151. super().setUp()
  152. self.tdir = os.path.realpath(tempfile.mkdtemp())
  153. self.responses = []
  154. self.orig_home = os.environ.get('HOME')
  155. def tearDown(self):
  156. self.rmtree_ignoring_errors(self.tdir)
  157. self.responses = []
  158. if self.orig_home is None:
  159. os.environ.pop('HOME', None)
  160. else:
  161. os.environ['HOME'] = self.orig_home
  162. super().tearDown()
  163. def clean_tdir(self):
  164. for x in os.listdir(self.tdir):
  165. x = os.path.join(self.tdir, x)
  166. if os.path.isdir(x):
  167. shutil.rmtree(x)
  168. else:
  169. os.remove(x)
  170. self.responses = []
  171. def cr(self, a, b):
  172. def f(r):
  173. r.pop('size', None)
  174. return r
  175. a = tuple(f(r) for r in a if r.get('status') != 'PROGRESS')
  176. b = tuple(f(r) for r in b if r.get('status') != 'PROGRESS')
  177. self.ae(a, b)
  178. def assertResponses(self, ft, limit=1024, **kw):
  179. self.responses.append(response(**kw))
  180. self.cr(ft.test_responses[:limit], self.responses[:limit])
  181. def assertPathEqual(self, a, b):
  182. a = os.path.abspath(os.path.realpath(a))
  183. b = os.path.abspath(os.path.realpath(b))
  184. self.ae(a, b)
  185. def test_rsync_roundtrip(self):
  186. test_rsync_roundtrip(self)
  187. def test_file_get(self):
  188. # send refusal
  189. for quiet in (0, 1, 2):
  190. ft = FileTransmission(allow=False)
  191. ft.handle_serialized_command(serialized_cmd(action='receive', id='x', quiet=quiet))
  192. self.cr(ft.test_responses, [] if quiet == 2 else [response(id='x', status='EPERM:User refused the transfer')])
  193. self.assertFalse(ft.active_sends)
  194. # reading metadata for specs
  195. cwd = os.path.join(self.tdir, 'cwd')
  196. home = os.path.join(self.tdir, 'home')
  197. os.mkdir(cwd), os.mkdir(home)
  198. with set_paths(cwd=cwd, home=home):
  199. ft = FileTransmission()
  200. self.responses = []
  201. ft.handle_serialized_command(serialized_cmd(action='receive', size=1))
  202. self.assertResponses(ft, status='OK')
  203. ft.handle_serialized_command(serialized_cmd(action='file', file_id='missing', name='XXX'))
  204. self.responses.append(response(status='ENOENT:Failed to read spec', file_id='missing'))
  205. self.assertResponses(ft, status='OK', name=home)
  206. ft = FileTransmission()
  207. self.responses = []
  208. ft.handle_serialized_command(serialized_cmd(action='receive', size=2))
  209. self.assertResponses(ft, status='OK')
  210. with open(os.path.join(home, 'a'), 'w') as f:
  211. f.write('a')
  212. os.mkdir(f.name + 'd')
  213. with open(os.path.join(f.name + 'd', 'b'), 'w') as f2:
  214. f2.write('bbb')
  215. os.symlink(f.name, f.name + 'd/s')
  216. os.link(f.name, f.name + 'd/h')
  217. os.symlink('XXX', f.name + 'd/q')
  218. ft.handle_serialized_command(serialized_cmd(action='file', file_id='a', name='a'))
  219. ft.handle_serialized_command(serialized_cmd(action='file', file_id='b', name='ad'))
  220. files = {r['name']: r for r in ft.test_responses if r['action'] == 'file'}
  221. self.ae(len(files), 6)
  222. q = files[f.name]
  223. tgt = q['status'].encode('ascii')
  224. self.ae(q['size'], 1), self.assertNotIn('ftype', q)
  225. q = files[f.name + 'd']
  226. self.ae(q['ftype'], 'directory')
  227. q = files[f.name + 'd/b']
  228. self.ae(q['size'], 3)
  229. q = files[f.name + 'd/s']
  230. self.ae(q['ftype'], 'symlink')
  231. self.ae(q['data'], tgt)
  232. q = files[f.name + 'd/h']
  233. self.ae(q['ftype'], 'link')
  234. self.ae(q['data'], tgt)
  235. q = files[f.name + 'd/q']
  236. self.ae(q['ftype'], 'symlink')
  237. self.assertNotIn('data', q)
  238. base = os.path.join(self.tdir, 'base')
  239. os.mkdir(base)
  240. src = os.path.join(base, 'src.bin')
  241. data = os.urandom(16 * 1024)
  242. with open(src, 'wb') as f:
  243. f.write(data)
  244. sl = os.path.join(base, 'src.link')
  245. os.symlink(src, sl)
  246. for compress in ('none', 'zlib'):
  247. ft = FileTransmission()
  248. self.responses = []
  249. ft.handle_serialized_command(serialized_cmd(action='receive', size=1))
  250. self.assertResponses(ft, status='OK')
  251. ft.handle_serialized_command(serialized_cmd(action='file', file_id='src', name=src))
  252. ft.active_sends['test'].metadata_sent = True
  253. ft.test_responses = []
  254. ft.handle_serialized_command(serialized_cmd(action='file', file_id='src', name=src, compression=compress))
  255. received = b''.join(x['data'] for x in ft.test_responses)
  256. if compress == 'zlib':
  257. received = ZlibDecompressor()(received, True)
  258. self.ae(data, received)
  259. ft.test_responses = []
  260. ft.handle_serialized_command(serialized_cmd(action='file', file_id='sl', name=sl, compression=compress))
  261. received = b''.join(x['data'] for x in ft.test_responses)
  262. self.ae(received.decode('utf-8'), src)
  263. def test_parse_ftc(self):
  264. def t(raw, *expected):
  265. a = []
  266. def c(k, v):
  267. a.append(str(k, 'utf-8'))
  268. a.append(str(v, 'utf-8'))
  269. parse_ftc(raw, c)
  270. self.ae(tuple(a), expected)
  271. t('a=b', 'a', 'b')
  272. t('a=b;', 'a', 'b')
  273. t('a1=b1;c=d;;', 'a1', 'b1', 'c', 'd')
  274. t('a1=b1;c=d;;e', 'a1', 'b1', 'c', 'd')
  275. t('a1=b1;c=d;;;1=1', 'a1', 'b1', 'c', 'd', '1', '1')
  276. def test_rsync_hashers(self):
  277. h = Hasher("xxh3-64")
  278. h.update(b'abcd')
  279. self.assertEqual(h.hexdigest(), '6497a96f53a89890')
  280. self.assertEqual(h.digest64(), 7248448420886124688)
  281. h128 = Hasher("xxh3-128")
  282. h128.update(b'abcd')
  283. self.assertEqual(h128.hexdigest(), '8d6b60383dfa90c21be79eecd1b1353d')
  284. @contextmanager
  285. def run_kitten(self, cmd, home_dir='', allow=True, cwd=''):
  286. cwd = cwd or self.kitten_cwd or self.tdir
  287. cmd = [kitten_exe(), 'transfer'] + (['--direction=receive'] if self.direction_receive else []) + cmd
  288. env = {'PWD': cwd}
  289. env['HOME'] = home_dir or self.kitten_home or self.tdir
  290. with set_paths(home=self.kitty_home, cwd=self.kitty_cwd):
  291. pty = TransferPTY(cmd, cwd=cwd, allow=allow, env=env)
  292. i = 10
  293. while i > 0 and not pty.screen_contents().strip():
  294. pty.process_input_from_child()
  295. i -= 1
  296. yield pty
  297. def basic_transfer_tests(self):
  298. src = os.path.join(self.tdir, 'src')
  299. self.src_data = os.urandom(11113)
  300. with open(src, 'wb') as s:
  301. s.write(self.src_data)
  302. dest = os.path.join(self.tdir, 'dest')
  303. with self.run_kitten([src, dest], allow=False) as pty:
  304. pty.wait_till_child_exits(require_exit_code=1)
  305. self.assertFalse(os.path.exists(dest))
  306. def single_file(*cmd):
  307. with self.run_kitten(list(cmd) + [src, dest]) as pty:
  308. pty.wait_till_child_exits(require_exit_code=0)
  309. with open(dest, 'rb') as f:
  310. self.assertEqual(self.src_data, f.read())
  311. single_file()
  312. single_file()
  313. single_file('--transmit-deltas')
  314. with open(dest, 'wb') as d:
  315. d.write(os.urandom(1023))
  316. single_file('--transmit-deltas')
  317. os.remove(dest)
  318. single_file('--transmit-deltas')
  319. single_file('--compress=never')
  320. single_file('--compress=always')
  321. single_file('--transmit-deltas', '--compress=never')
  322. def multiple_files(*cmd):
  323. src = os.path.join(self.tdir, 'msrc')
  324. dest = os.path.join(self.tdir, 'mdest')
  325. if os.path.exists(src):
  326. shutil.rmtree(src)
  327. os.mkdir(src)
  328. os.makedirs(dest, exist_ok=True)
  329. expected = {}
  330. Entry = namedtuple('Entry', 'relpath mtime mode nlink')
  331. def entry(path, base=src):
  332. st = os.stat(path, follow_symlinks=False)
  333. mtime = st.st_mtime_ns
  334. if stat.S_ISDIR(st.st_mode):
  335. mtime = 0 # mtime is flaky for dirs on CI even empty ones
  336. return Entry(os.path.relpath(path, base), mtime, oct(st.st_mode), st.st_nlink)
  337. def se(path):
  338. e = entry(path)
  339. expected[e.relpath] = e
  340. b = Path(src)
  341. with open(b / 'simple', 'wb') as f:
  342. f.write(os.urandom(1317))
  343. os.fchmod(f.fileno(), 0o766)
  344. os.link(f.name, b / 'hardlink')
  345. os.utime(f.name, (1.3, 1.3))
  346. se(f.name)
  347. se(str(b/'hardlink'))
  348. os.mkdir(b / 'empty')
  349. se(str(b/'empty'))
  350. s = b / 'sub'
  351. os.mkdir(s)
  352. with open(s / 'reg', 'wb') as f:
  353. f.write(os.urandom(113))
  354. os.utime(f.name, (1171.3, 1171.3))
  355. se(f.name)
  356. se(str(s))
  357. os.symlink('/', b/'abssym')
  358. os.utime(b/'abssym', (1234.5, 1234.5), follow_symlinks=False)
  359. se(b/'abssym')
  360. os.symlink('sub/reg', b/'sym')
  361. os.utime(b/'sym', (6789.1, 6789.1), follow_symlinks=False)
  362. se(b/'sym')
  363. with self.run_kitten(list(cmd) + [src, dest]) as pty:
  364. pty.wait_till_child_exits(require_exit_code=0)
  365. actual = {}
  366. def de(path):
  367. e = entry(path, os.path.join(dest, os.path.basename(src)))
  368. if e.relpath != '.':
  369. actual[e.relpath] = e
  370. for dirpath, dirnames, filenames in os.walk(dest):
  371. for x in dirnames:
  372. de(os.path.join(dirpath, x))
  373. for x in filenames:
  374. de(os.path.join(dirpath, x))
  375. self.assertEqual(expected, actual)
  376. for key, e in expected.items():
  377. ex = os.path.join(src, key)
  378. ax = os.path.join(dest, os.path.basename(src), key)
  379. if os.path.islink(ex):
  380. self.ae(os.readlink(ex), os.readlink(ax))
  381. elif os.path.isfile(ex):
  382. with open(ex, 'rb') as ef, open(ax, 'rb') as af:
  383. self.assertEqual(ef.read(), af.read())
  384. multiple_files()
  385. multiple_files('--compress=always')
  386. self.clean_tdir()
  387. multiple_files('--transmit-deltas')
  388. multiple_files('--transmit-deltas')
  389. def setup_dirs(self):
  390. self.clean_tdir()
  391. self.kitty_home = os.path.join(self.tdir, 'kitty-home')
  392. self.kitty_cwd = os.path.join(self.tdir, 'kitty-cwd')
  393. self.kitten_home = os.path.join(self.tdir, 'kitten-home')
  394. self.kitten_cwd = os.path.join(self.tdir, 'kitten-cwd')
  395. tuple(map(os.mkdir, (self.kitty_home, self.kitty_cwd, self.kitten_home, self.kitten_cwd)))
  396. def create_src(self, base):
  397. src = os.path.join(base, 'src')
  398. with open(src, 'wb') as s:
  399. s.write(self.src_data)
  400. return src
  401. def mirror_test(self, src, dest, prefix=''):
  402. self.create_src(src)
  403. os.symlink('/', os.path.join(src, 'sym'))
  404. os.mkdir(os.path.join(src, 'sub'))
  405. os.link(os.path.join(src, 'src'), os.path.join(src, 'sub', 'hardlink'))
  406. with self.run_kitten(['--mode=mirror', f'{prefix}src', f'{prefix}sym', f'{prefix}sub']) as pty:
  407. pty.wait_till_child_exits(require_exit_code=0)
  408. os.remove(os.path.join(dest, 'src'))
  409. os.remove(os.path.join(dest, 'sym'))
  410. shutil.rmtree(os.path.join(dest, 'sub'))
  411. def test_transfer_receive(self):
  412. self.direction_receive = True
  413. self.basic_transfer_tests()
  414. self.setup_dirs()
  415. self.create_src(self.kitty_home)
  416. # dir expansion with single transfer
  417. with self.run_kitten(['~/src', '~/src']) as pty:
  418. pty.wait_till_child_exits(require_exit_code=0)
  419. os.remove(os.path.join(self.kitten_home, 'src'))
  420. with self.run_kitten(['src', 'src']) as pty:
  421. pty.wait_till_child_exits(require_exit_code=0)
  422. os.remove(os.path.join(self.kitten_cwd, 'src'))
  423. # dir expansion with multiple transfers
  424. os.symlink('/', os.path.join(self.kitty_home, 'sym'))
  425. with self.run_kitten(['~/src', '~/sym', '~']) as pty:
  426. pty.wait_till_child_exits(require_exit_code=0)
  427. os.remove(os.path.join(self.kitten_home, 'src'))
  428. os.remove(os.path.join(self.kitten_home, 'sym'))
  429. with self.run_kitten(['src', 'sym', '.']) as pty:
  430. pty.wait_till_child_exits(require_exit_code=0)
  431. os.remove(os.path.join(self.kitten_cwd, 'src'))
  432. os.remove(os.path.join(self.kitten_cwd, 'sym'))
  433. # mirroring
  434. self.setup_dirs()
  435. self.mirror_test(self.kitty_home, self.kitten_home)
  436. def test_transfer_send(self):
  437. self.basic_transfer_tests()
  438. src = os.path.join(self.tdir, 'src')
  439. with open(src, 'wb') as s:
  440. s.write(self.src_data)
  441. self.setup_dirs()
  442. self.create_src(self.kitten_home)
  443. # dir expansion with single transfer
  444. with self.run_kitten(['~/src', '~/src']) as pty:
  445. pty.wait_till_child_exits(require_exit_code=0)
  446. os.remove(os.path.join(self.kitty_home, 'src'))
  447. self.create_src(self.kitten_cwd)
  448. with self.run_kitten(['src', 'src']) as pty:
  449. pty.wait_till_child_exits(require_exit_code=0)
  450. os.remove(os.path.join(self.kitty_home, 'src'))
  451. # dir expansion with multiple transfers
  452. os.symlink('/', os.path.join(self.kitten_home, 'sym'))
  453. with self.run_kitten(['~/src', '~/sym', '~']) as pty:
  454. pty.wait_till_child_exits(require_exit_code=0)
  455. os.remove(os.path.join(self.kitty_home, 'src'))
  456. os.remove(os.path.join(self.kitty_home, 'sym'))
  457. os.symlink('/', os.path.join(self.kitten_cwd, 'sym'))
  458. with self.run_kitten(['src', 'sym', '.']) as pty:
  459. pty.wait_till_child_exits(require_exit_code=0)
  460. os.remove(os.path.join(self.kitty_home, 'src'))
  461. os.remove(os.path.join(self.kitty_home, 'sym'))
  462. # mirroring
  463. self.setup_dirs()
  464. self.mirror_test(self.kitten_home, self.kitty_home, prefix='~/')