ssh.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. #!/usr/bin/env python
  2. # License: GPLv3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
  3. import glob
  4. import json
  5. import os
  6. import shutil
  7. import subprocess
  8. import tempfile
  9. from contextlib import suppress
  10. from functools import lru_cache
  11. from kittens.ssh.utils import get_connection_data
  12. from kitty.constants import is_macos, kitten_exe, runtime_dir
  13. from kitty.fast_data_types import CURSOR_BEAM, shm_unlink
  14. from kitty.utils import SSHConnectionData
  15. from . import BaseTest, retry_on_failure
  16. from .shell_integration import bash_ok, basic_shell_env
  17. def files_in(path):
  18. for record in os.walk(path):
  19. for f in record[-1]:
  20. yield os.path.relpath(os.path.join(record[0], f), path)
  21. class SSHKitten(BaseTest):
  22. @retry_on_failure()
  23. def test_basic_pty_operations(self):
  24. pty = self.create_pty('echo hello')
  25. pty.process_input_from_child()
  26. self.ae(pty.screen_contents(), 'hello')
  27. pty = self.create_pty(self.cmd_to_run_python_code('''\
  28. import array, fcntl, sys, termios
  29. buf = array.array('H', [0, 0, 0, 0])
  30. fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
  31. print(' '.join(map(str, buf)))'''), lines=13, cols=77)
  32. pty.process_input_from_child()
  33. self.ae(pty.screen_contents(), '13 77 770 260')
  34. @retry_on_failure()
  35. def test_ssh_connection_data(self):
  36. def t(cmdline, binary='ssh', host='main', port=None, identity_file='', extra_args=()):
  37. if identity_file:
  38. identity_file = os.path.abspath(identity_file)
  39. en = {f'{x[0]}' for x in extra_args}
  40. q = get_connection_data(cmdline.split(), extra_args=en)
  41. self.ae(q, SSHConnectionData(binary, host, port, identity_file, extra_args))
  42. t('ssh main')
  43. t('ssh un@ip -i ident -p34', host='un@ip', port=34, identity_file='ident')
  44. t('ssh un@ip -iident -p34', host='un@ip', port=34, identity_file='ident')
  45. t('ssh -p 33 main', port=33)
  46. t('ssh -p 34 ssh://un@ip:33/', host='un@ip', port=34)
  47. t('ssh --kitten=one -p 12 --kitten two -ix main', identity_file='x', port=12, extra_args=(('--kitten', 'one'), ('--kitten', 'two')))
  48. self.assertTrue(runtime_dir())
  49. @property
  50. @lru_cache
  51. def all_possible_sh(self):
  52. python = 'python3' if shutil.which('python3') else 'python'
  53. return tuple(filter(shutil.which, ('dash', 'zsh', 'bash', 'posh', 'sh', python)))
  54. @retry_on_failure()
  55. def test_ssh_copy(self):
  56. simple_data = 'rkjlhfwf9whoaa'
  57. def touch(p):
  58. with open(os.path.join(local_home, p), 'w') as f:
  59. f.write(simple_data)
  60. for sh in self.all_possible_sh:
  61. with self.subTest(sh=sh), tempfile.TemporaryDirectory() as remote_home, tempfile.TemporaryDirectory() as local_home:
  62. tuple(map(touch, 'simple-file g.1 g.2'.split()))
  63. os.makedirs(f'{local_home}/d1/d2/d3')
  64. touch('d1/d2/x')
  65. touch('d1/d2/w.exclude')
  66. os.mkdir(f'{local_home}/d1/r')
  67. touch('d1/r/noooo')
  68. os.symlink('d2/x', f'{local_home}/d1/y')
  69. os.symlink('simple-file', f'{local_home}/s1')
  70. os.symlink('simple-file', f'{local_home}/s2')
  71. conf = '''\
  72. copy simple-file
  73. copy s1
  74. copy --symlink-strategy=keep-path s2
  75. copy --dest=a/sfa simple-file
  76. copy --glob g.*
  77. copy --exclude **/w.* --exclude **/r d1
  78. '''
  79. self.check_bootstrap(
  80. sh, remote_home, test_script='env; exit 0', SHELL_INTEGRATION_VALUE='', conf=conf, home=local_home,
  81. )
  82. tname = '.terminfo'
  83. if os.path.exists('/usr/share/misc/terminfo.cdb'):
  84. tname += '.cdb'
  85. self.assertTrue(os.path.lexists(f'{remote_home}/{tname}/78'))
  86. self.assertTrue(os.path.exists(f'{remote_home}/{tname}/78/xterm-kitty'))
  87. self.assertTrue(os.path.exists(f'{remote_home}/{tname}/x/xterm-kitty'))
  88. for w in ('simple-file', 'a/sfa', 's2'):
  89. with open(os.path.join(remote_home, w)) as f:
  90. self.ae(f.read(), simple_data)
  91. self.assertFalse(os.path.islink(f.name))
  92. self.assertTrue(os.path.lexists(f'{remote_home}/d1/y'))
  93. self.assertTrue(os.path.exists(f'{remote_home}/d1/y'))
  94. self.ae(os.readlink(f'{remote_home}/d1/y'), 'd2/x')
  95. self.ae(os.readlink(f'{remote_home}/s1'), 'simple-file')
  96. contents = set(files_in(remote_home))
  97. contents.discard('.zshrc') # added by check_bootstrap()
  98. # depending on platform one of these is a symlink and hence
  99. # isn't in contents
  100. contents.discard(f'{tname}/x/xterm-kitty')
  101. contents.discard(f'{tname}/78/xterm-kitty')
  102. self.ae(contents, {
  103. 'g.1', 'g.2', f'{tname}/kitty.terminfo', 'simple-file', 'd1/d2/x', 'd1/y', 'a/sfa', 's1', 's2',
  104. '.local/share/kitty-ssh-kitten/kitty/version', '.local/share/kitty-ssh-kitten/kitty/bin/kitty',
  105. '.local/share/kitty-ssh-kitten/kitty/bin/kitten'
  106. })
  107. self.ae(len(glob.glob(f'{remote_home}/{tname}/*/xterm-kitty')), 2)
  108. @retry_on_failure()
  109. def test_ssh_env_vars(self):
  110. tset = '$A-$(echo no)-`echo no2` !Q5 "something else"'
  111. for sh in self.all_possible_sh:
  112. with self.subTest(sh=sh), tempfile.TemporaryDirectory() as tdir:
  113. os.mkdir(os.path.join(tdir, 'cwd'))
  114. conf = f'''
  115. cwd $HOME/cwd
  116. env A=AAA
  117. env TSET={tset}
  118. env COLORTERM
  119. '''
  120. pty = self.check_bootstrap(
  121. sh, tdir, test_script='env; pwd; exit 0', SHELL_INTEGRATION_VALUE='', conf=conf
  122. )
  123. pty.wait_till(lambda: 'TSET={}'.format(tset.replace('$A', 'AAA')) in pty.screen_contents())
  124. self.assertNotIn('COLORTERM', pty.screen_contents())
  125. pty.wait_till(lambda: '/cwd' in pty.screen_contents())
  126. self.assertTrue(pty.is_echo_on())
  127. @retry_on_failure()
  128. def test_ssh_bootstrap_with_different_launchers(self):
  129. for launcher in self.all_possible_sh:
  130. if 'python' in launcher:
  131. continue
  132. for sh in self.all_possible_sh:
  133. if sh == 'sh' or 'python' in sh:
  134. q = shutil.which(launcher)
  135. if q:
  136. with self.subTest(sh=sh, launcher=q), tempfile.TemporaryDirectory() as tdir:
  137. self.check_bootstrap(sh, tdir, test_script='env; exit 0', SHELL_INTEGRATION_VALUE='', launcher=q)
  138. @retry_on_failure()
  139. def test_ssh_leading_data(self):
  140. script = 'echo "ld:$leading_data"; exit 0'
  141. for sh in self.all_possible_sh:
  142. if 'python' in sh:
  143. script = 'print("ld:" + leading_data.decode("ascii")); raise SystemExit(0);'
  144. with self.subTest(sh=sh), tempfile.TemporaryDirectory() as tdir:
  145. pty = self.check_bootstrap(
  146. sh, tdir, test_script=script,
  147. SHELL_INTEGRATION_VALUE='', pre_data='before_tarfile')
  148. self.ae(pty.screen_contents(), 'UNTAR_DONE\nld:before_tarfile')
  149. @retry_on_failure()
  150. def test_ssh_login_shell_detection(self):
  151. methods = []
  152. if shutil.which('python') or shutil.which('python3') or shutil.which('python2'):
  153. methods.append('using_python')
  154. if is_macos:
  155. methods += ['using_id']
  156. else:
  157. if shutil.which('getent'):
  158. methods.append('using_getent')
  159. if os.access('/etc/passwd', os.R_OK):
  160. methods.append('using_passwd')
  161. self.assertTrue(methods)
  162. import pwd
  163. try:
  164. expected_login_shell = pwd.getpwuid(os.geteuid()).pw_shell
  165. except KeyError:
  166. self.skipTest('Skipping login shell detection as getpwuid() failed to read login shell')
  167. if os.path.basename(expected_login_shell) == 'nologin':
  168. self.skipTest('Skipping login shell detection as login shell is set to nologin')
  169. for m in methods:
  170. for sh in self.all_possible_sh:
  171. if 'python' in sh:
  172. continue
  173. with self.subTest(sh=sh, method=m), tempfile.TemporaryDirectory() as tdir:
  174. pty = self.check_bootstrap(sh, tdir, test_script=f'{m}; echo "$login_shell"; exit 0', SHELL_INTEGRATION_VALUE='')
  175. self.assertIn(expected_login_shell, pty.screen_contents())
  176. @retry_on_failure()
  177. def test_ssh_shell_integration(self):
  178. ok_login_shell = ''
  179. for sh in self.all_possible_sh:
  180. for login_shell in {'fish', 'zsh', 'bash'} & set(self.all_possible_sh):
  181. if login_shell == 'bash' and not bash_ok():
  182. continue
  183. ok_login_shell = login_shell
  184. with self.subTest(sh=sh, login_shell=login_shell), tempfile.TemporaryDirectory() as tdir:
  185. pty = self.check_bootstrap(sh, tdir, login_shell)
  186. if login_shell == 'bash':
  187. pty.send_cmd_to_child('echo $HISTFILE')
  188. pty.wait_till(lambda: '/.bash_history' in pty.screen_contents())
  189. elif login_shell == 'zsh':
  190. pty.send_cmd_to_child('echo "login_shell=$ZSH_NAME"')
  191. pty.wait_till(lambda: 'login_shell=zsh' in pty.screen_contents())
  192. self.assertIn(b'\x1b]133;', pty.received_bytes)
  193. # check that turning off shell integration works
  194. if ok_login_shell in ('bash', 'zsh'):
  195. for val in ('', 'no-rc', 'enabled no-rc'):
  196. for sh in self.all_possible_sh:
  197. with tempfile.TemporaryDirectory() as tdir:
  198. pty = self.check_bootstrap(sh, tdir, ok_login_shell, val)
  199. num_lines = len(pty.screen_contents().splitlines())
  200. pty.send_cmd_to_child('echo "$TERM=fruity"')
  201. pty.wait_till(lambda: 'kitty=fruity' in pty.screen_contents(), timeout=30)
  202. pty.wait_till(lambda: len(pty.screen_contents().splitlines()) >= num_lines + 2)
  203. self.assertEqual(pty.screen.cursor.shape, 0)
  204. self.assertNotIn(b'\x1b]133;', pty.received_bytes)
  205. def check_bootstrap(self, sh, home_dir, login_shell='', SHELL_INTEGRATION_VALUE='enabled', test_script='', pre_data='', conf='', launcher='sh', home=''):
  206. if login_shell:
  207. conf += f'\nlogin_shell {login_shell}'
  208. if 'python' in sh:
  209. if test_script.startswith('env;'):
  210. test_script = f'os.execlp("sh", "sh", "-c", {test_script!r})'
  211. test_script = f'print("UNTAR_DONE", flush=True); {test_script}'
  212. else:
  213. test_script = f'echo "UNTAR_DONE"; {test_script}'
  214. conf += '\nshell_integration ' + (SHELL_INTEGRATION_VALUE or 'disabled')
  215. conf += '\ninterpreter ' + sh
  216. env = os.environ.copy()
  217. if home:
  218. env['HOME'] = home
  219. cp = subprocess.run([kitten_exe(), '__pytest__', 'ssh', test_script], env=env, stdout=subprocess.PIPE, input=conf.encode('utf-8'))
  220. self.assertEqual(cp.returncode, 0)
  221. self.rdata = json.loads(cp.stdout)
  222. del cp
  223. try:
  224. env = basic_shell_env(home_dir)
  225. # Avoid generating unneeded completion scripts
  226. os.makedirs(os.path.join(home_dir, '.local', 'share', 'fish', 'generated_completions'), exist_ok=True)
  227. # prevent newuser-install from running
  228. open(os.path.join(home_dir, '.zshrc'), 'w').close()
  229. pty = self.create_pty([launcher, '-c', ' '.join(self.rdata['cmd'])], cwd=home_dir, env=env)
  230. pty.turn_off_echo()
  231. if pre_data:
  232. pty.write_buf = pre_data.encode('utf-8')
  233. def check_untar_or_fail():
  234. q = pty.screen_contents()
  235. if 'bzip2' in q:
  236. raise ValueError('Untarring failed with screen contents:\n' + q)
  237. return 'UNTAR_DONE' in q
  238. pty.wait_till(check_untar_or_fail, timeout=60)
  239. self.assertTrue(os.path.exists(os.path.join(home_dir, '.terminfo/kitty.terminfo')))
  240. if SHELL_INTEGRATION_VALUE != 'enabled':
  241. pty.wait_till(lambda: len(pty.screen_contents().splitlines()) > 1, timeout=60)
  242. self.assertEqual(pty.screen.cursor.shape, 0)
  243. else:
  244. pty.wait_till(lambda: pty.screen.cursor.shape == CURSOR_BEAM, timeout=60)
  245. return pty
  246. finally:
  247. with suppress(FileNotFoundError):
  248. shm_unlink(self.rdata['shm_name'])