runner.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #!/usr/bin/env python
  2. # License: GPL v3 Copyright: 2018, Kovid Goyal <kovid at kovidgoyal.net>
  3. import importlib
  4. import os
  5. import sys
  6. from contextlib import contextmanager
  7. from functools import partial
  8. from typing import TYPE_CHECKING, Any, Callable, Dict, FrozenSet, Generator, List, NamedTuple, Optional, Union, cast
  9. from kitty.constants import list_kitty_resources
  10. from kitty.types import run_once
  11. from kitty.typing import BossType, WindowType
  12. from kitty.utils import resolve_abs_or_config_path
  13. aliases = {'url_hints': 'hints'}
  14. if TYPE_CHECKING:
  15. from kitty.conf.types import Definition
  16. else:
  17. Definition = object
  18. def resolved_kitten(k: str) -> str:
  19. ans = aliases.get(k, k)
  20. head, tail = os.path.split(ans)
  21. tail = tail.replace('-', '_')
  22. return os.path.join(head, tail)
  23. def path_to_custom_kitten(config_dir: str, kitten: str) -> str:
  24. path = resolve_abs_or_config_path(kitten, conf_dir=config_dir)
  25. return os.path.abspath(path)
  26. @contextmanager
  27. def preserve_sys_path() -> Generator[None, None, None]:
  28. orig = sys.path[:]
  29. try:
  30. yield
  31. finally:
  32. if sys.path != orig:
  33. del sys.path[:]
  34. sys.path.extend(orig)
  35. class CLIOnlyKitten(TypeError):
  36. def __init__(self, kitten: str):
  37. super().__init__(f'The {kitten} kitten must be run only at the commandline, as: kitten {kitten}')
  38. def import_kitten_main_module(config_dir: str, kitten: str) -> Dict[str, Any]:
  39. if kitten.endswith('.py'):
  40. with preserve_sys_path():
  41. path = path_to_custom_kitten(config_dir, kitten)
  42. if os.path.dirname(path):
  43. sys.path.insert(0, os.path.dirname(path))
  44. with open(path) as f:
  45. src = f.read()
  46. code = compile(src, path, 'exec')
  47. g = {'__name__': 'kitten'}
  48. exec(code, g)
  49. hr = g.get('handle_result', lambda *a, **kw: None)
  50. return {'start': g['main'], 'end': hr}
  51. kitten = resolved_kitten(kitten)
  52. m = importlib.import_module(f'kittens.{kitten}.main')
  53. if not hasattr(m, 'main'):
  54. raise CLIOnlyKitten(kitten)
  55. return {
  56. 'start': getattr(m, 'main'),
  57. 'end': getattr(m, 'handle_result', lambda *a, **k: None),
  58. }
  59. class KittenMetadata(NamedTuple):
  60. handle_result: Callable[[Any, int, BossType], None] = lambda *a: None
  61. type_of_input: Optional[str] = None
  62. no_ui: bool = False
  63. has_ready_notification: bool = False
  64. open_url_handler: Optional[Callable[[BossType, WindowType, str, int, str], bool]] = None
  65. allow_remote_control: bool = False
  66. remote_control_password: Union[str, bool] = False
  67. def create_kitten_handler(kitten: str, orig_args: List[str]) -> KittenMetadata:
  68. from kitty.constants import config_dir
  69. kitten = resolved_kitten(kitten)
  70. m = import_kitten_main_module(config_dir, kitten)
  71. main = m['start']
  72. handle_result = m['end']
  73. return KittenMetadata(
  74. handle_result=partial(handle_result, [kitten] + orig_args),
  75. type_of_input=getattr(handle_result, 'type_of_input', None),
  76. no_ui=getattr(handle_result, 'no_ui', False),
  77. allow_remote_control=getattr(main, 'allow_remote_control', False),
  78. remote_control_password=getattr(main, 'remote_control_password', True),
  79. has_ready_notification=getattr(handle_result, 'has_ready_notification', False),
  80. open_url_handler=getattr(handle_result, 'open_url_handler', None))
  81. def set_debug(kitten: str) -> None:
  82. import builtins
  83. from kittens.tui.loop import debug
  84. setattr(builtins, 'debug', debug)
  85. def launch(args: List[str]) -> None:
  86. config_dir, kitten = args[:2]
  87. kitten = resolved_kitten(kitten)
  88. del args[:2]
  89. args = [kitten] + args
  90. os.environ['KITTY_CONFIG_DIRECTORY'] = config_dir
  91. set_debug(kitten)
  92. m = import_kitten_main_module(config_dir, kitten)
  93. try:
  94. result = m['start'](args)
  95. finally:
  96. sys.stdin = sys.__stdin__
  97. if result is not None:
  98. import base64
  99. import json
  100. data = base64.b85encode(json.dumps(result).encode('utf-8'))
  101. sys.stdout.buffer.write(b'\x1bP@kitty-kitten-result|')
  102. sys.stdout.buffer.write(data)
  103. sys.stdout.buffer.write(b'\x1b\\')
  104. sys.stderr.flush()
  105. sys.stdout.flush()
  106. def run_kitten(kitten: str, run_name: str = '__main__') -> None:
  107. import runpy
  108. original_kitten_name = kitten
  109. kitten = resolved_kitten(kitten)
  110. set_debug(kitten)
  111. if kitten in all_kitten_names():
  112. runpy.run_module(f'kittens.{kitten}.main', run_name=run_name)
  113. return
  114. # Look for a custom kitten
  115. if not kitten.endswith('.py'):
  116. kitten += '.py'
  117. from kitty.constants import config_dir
  118. path = path_to_custom_kitten(config_dir, kitten)
  119. if not os.path.exists(path):
  120. print('Available builtin kittens:', file=sys.stderr)
  121. for kitten in all_kitten_names():
  122. print(kitten, file=sys.stderr)
  123. raise SystemExit(f'No kitten named {original_kitten_name}')
  124. m = runpy.run_path(path, init_globals={'sys': sys, 'os': os}, run_name='__run_kitten__')
  125. from kitty.fast_data_types import set_options
  126. try:
  127. m['main'](sys.argv)
  128. finally:
  129. set_options(None)
  130. @run_once
  131. def all_kitten_names() -> FrozenSet[str]:
  132. ans = []
  133. for name in list_kitty_resources('kittens'):
  134. if '__' not in name and '.' not in name and name != 'tui':
  135. ans.append(name)
  136. return frozenset(ans)
  137. def list_kittens() -> None:
  138. print('You must specify the name of a kitten to run')
  139. print('Choose from:')
  140. print()
  141. for kitten in all_kitten_names():
  142. print(kitten)
  143. def get_kitten_cli_docs(kitten: str) -> Any:
  144. setattr(sys, 'cli_docs', {})
  145. run_kitten(kitten, run_name='__doc__')
  146. ans = getattr(sys, 'cli_docs')
  147. delattr(sys, 'cli_docs')
  148. if 'help_text' in ans and 'usage' in ans and 'options' in ans:
  149. return ans
  150. def get_kitten_wrapper_of(kitten: str) -> str:
  151. setattr(sys, 'cli_docs', {})
  152. run_kitten(kitten, run_name='__wrapper_of__')
  153. ans = getattr(sys, 'cli_docs')
  154. delattr(sys, 'cli_docs')
  155. return ans.get('wrapper_of') or ''
  156. def get_kitten_completer(kitten: str) -> Any:
  157. run_kitten(kitten, run_name='__completer__')
  158. ans = getattr(sys, 'kitten_completer', None)
  159. if ans is not None:
  160. delattr(sys, 'kitten_completer')
  161. return ans
  162. def get_kitten_conf_docs(kitten: str) -> Optional[Definition]:
  163. setattr(sys, 'options_definition', None)
  164. run_kitten(kitten, run_name='__conf__')
  165. ans = getattr(sys, 'options_definition')
  166. delattr(sys, 'options_definition')
  167. return cast(Definition, ans)
  168. def get_kitten_extra_cli_parsers(kitten: str) -> Dict[str,str]:
  169. setattr(sys, 'extra_cli_parsers', {})
  170. run_kitten(kitten, run_name='__extra_cli_parsers__')
  171. ans = getattr(sys, 'extra_cli_parsers')
  172. delattr(sys, 'extra_cli_parsers')
  173. return cast(Dict[str, str], ans)
  174. def main() -> None:
  175. try:
  176. args = sys.argv[1:]
  177. launch(args)
  178. except Exception:
  179. print('Unhandled exception running kitten:')
  180. import traceback
  181. traceback.print_exc()
  182. input('Press Enter to quit')