notifications.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #!/usr/bin/env python
  2. # License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
  3. import os
  4. import re
  5. import tempfile
  6. from base64 import standard_b64encode
  7. from kitty.notifications import Channel, DesktopIntegration, IconDataCache, NotificationManager, UIState, Urgency
  8. from . import BaseTest
  9. def n(
  10. title='title', body='', urgency=Urgency.Normal, desktop_notification_id=1, icon_names=(), icon_path='',
  11. application_name='', notification_types=(), timeout=-1, sound='system',
  12. ):
  13. return {
  14. 'title': title, 'body': body, 'urgency': urgency, 'id': desktop_notification_id, 'icon_names': icon_names, 'icon_path': icon_path,
  15. 'application_name': application_name, 'notification_types': notification_types, 'timeout': timeout, 'sound': sound,
  16. }
  17. class DesktopIntegration(DesktopIntegration):
  18. def initialize(self):
  19. self.reset()
  20. def reset(self):
  21. self.notifications = []
  22. self.close_events = []
  23. self.new_version_activated = False
  24. self.close_succeeds = True
  25. self.counter = 0
  26. def query_live_notifications(self, channel_id, client_id):
  27. ids = [n['id'] for n in self.notifications]
  28. self.notification_manager.send_live_response(channel_id, client_id, tuple(ids))
  29. def on_new_version_notification_activation(self, cmd, which) -> None:
  30. self.new_version_activated = which + 1
  31. def close_notification(self, desktop_notification_id: int) -> bool:
  32. self.close_events.append(desktop_notification_id)
  33. if self.close_succeeds:
  34. self.notification_manager.notification_closed(desktop_notification_id)
  35. return self.close_succeeds
  36. def notify(self, cmd, existing_desktop_notification_id) -> int:
  37. if existing_desktop_notification_id:
  38. did = existing_desktop_notification_id
  39. else:
  40. self.counter += 1
  41. did = self.counter
  42. title, body, urgency = cmd.title, cmd.body, (Urgency.Normal if cmd.urgency is None else cmd.urgency)
  43. ans = n(title, body, urgency, did, cmd.icon_names, os.path.basename(cmd.icon_path), cmd.application_name,
  44. cmd.notification_types, timeout=cmd.timeout, sound=cmd.sound_name)
  45. self.notifications.append(ans)
  46. return self.counter
  47. class Channel(Channel):
  48. focused = visible = True
  49. def __init__(self, *a):
  50. super().__init__(*a)
  51. self.reset()
  52. def reset(self):
  53. self.responses = []
  54. self.focus_events = []
  55. def ui_state(self, channel_id):
  56. return UIState(self.focused, self.visible)
  57. def focus(self, channel_id: int, activation_token: str) -> None:
  58. self.focus_events.append(activation_token)
  59. def send(self, channel_id: int, osc_escape_code: str) -> bool:
  60. self.responses.append(osc_escape_code)
  61. class NotificationManager(NotificationManager):
  62. @property
  63. def filter_rules(self):
  64. yield from ('title:filterme',)
  65. def do_test(self: 'TestNotifications', tdir: str) -> None:
  66. di = DesktopIntegration(None)
  67. ch = Channel()
  68. nm = NotificationManager(di, ch, lambda *a, **kw: None, base_cache_dir=tdir)
  69. di.notification_manager = nm
  70. def reset():
  71. di.reset()
  72. ch.reset()
  73. nm.reset()
  74. def h(raw_data, osc_code=99, channel_id=1):
  75. nm.handle_notification_cmd(channel_id, osc_code, raw_data)
  76. def activate(which=0, button=0):
  77. n = di.notifications[which]
  78. nm.notification_activated(n['id'], button)
  79. def close(which=0):
  80. n = di.notifications[which]
  81. di.close_notification(n['id'])
  82. def assert_events(focus=True, close=0, report='', close_response='', live=''):
  83. self.ae(ch.focus_events, [''] if focus else [])
  84. if report:
  85. self.assertIn(f'99;i={report};', ch.responses)
  86. else:
  87. for r in ch.responses:
  88. m = re.match(r'99;i=[a-z0-9]+;', r)
  89. self.assertIsNone(m, f'Unexpectedly found report response: {r}')
  90. if close_response:
  91. self.assertIn(f'99;i={close_response}:p=close;', ch.responses)
  92. else:
  93. for r in ch.responses:
  94. m = re.match(r'99;i=[a-z0-9]+:p=close;', r)
  95. self.assertIsNone(m, f'Unexpectedly found close response: {r}')
  96. if live:
  97. self.assertIn(f'99;i=live:p=alive;{live}', ch.responses)
  98. else:
  99. for r in ch.responses:
  100. m = re.match(r'99;i=[a-z0-9]+:p=alive;', r)
  101. self.assertIsNone(m, f'Unexpectedly found alive response: {r}')
  102. self.ae(di.close_events, [close] if close else [])
  103. h('test it', osc_code=9)
  104. self.ae(di.notifications, [n(title='test it')])
  105. activate()
  106. assert_events()
  107. reset()
  108. h('d=0:u=2:i=x;title')
  109. h('d=1:i=x:p=body;body')
  110. self.ae(di.notifications, [n(body='body', urgency=Urgency.Critical)])
  111. activate()
  112. assert_events()
  113. reset()
  114. h('i=x:p=body:a=-focus;body')
  115. self.ae(di.notifications, [n(title='body')])
  116. activate()
  117. assert_events(focus=False)
  118. reset()
  119. nm.send_new_version_notification('moose')
  120. self.ae(di.notifications, [n('kitty update available!', 'kitty version moose released')])
  121. activate()
  122. self.assertTrue(di.new_version_activated)
  123. reset()
  124. h('i=x:e=1;' + standard_b64encode(b'title').decode('ascii'))
  125. self.ae(di.notifications, [n()])
  126. activate()
  127. assert_events()
  128. reset()
  129. h('e=1;' + standard_b64encode(b'title').decode('ascii'))
  130. self.ae(di.notifications, [n()])
  131. activate()
  132. assert_events()
  133. reset()
  134. h('d=0:i=x:a=-report;title')
  135. h('d=1:i=x:a=report;body')
  136. self.ae(di.notifications, [n(title='titlebody')])
  137. activate()
  138. assert_events(report='x')
  139. reset()
  140. h('a=report;title')
  141. self.ae(di.notifications, [n()])
  142. activate()
  143. assert_events(report='0')
  144. reset()
  145. h('d=0:i=y;title')
  146. h('d=1:i=y:p=xxx;title')
  147. self.ae(di.notifications, [n()])
  148. reset()
  149. # test filtering
  150. h(';title')
  151. h(';filterme please')
  152. self.ae(di.notifications, [n()])
  153. reset()
  154. # test closing interactions with reporting and activation
  155. h('i=c;title')
  156. self.ae(di.notifications, [n()])
  157. close()
  158. assert_events(focus=False, close=True)
  159. reset()
  160. h('i=c:c=1;title')
  161. self.ae(di.notifications, [n()])
  162. h('i=c:p=close')
  163. self.ae(di.notifications, [n()])
  164. assert_events(focus=False, close=True, close_response='c')
  165. reset()
  166. h('i=c:c=1;title')
  167. h('i=c:p=close')
  168. self.ae(di.notifications, [n()])
  169. assert_events(focus=False, close=True, close_response='c')
  170. reset()
  171. h('i=c;title')
  172. activate()
  173. close()
  174. h('i=c:p=close')
  175. self.ae(di.notifications, [n()])
  176. assert_events(focus=True, close=True)
  177. reset()
  178. h('i=c:a=report:c=1;title')
  179. activate()
  180. h('i=c:p=close')
  181. self.ae(di.notifications, [n()])
  182. assert_events(focus=True, report='c', close=True, close_response='c')
  183. reset()
  184. h('i=a[;title')
  185. h('i=b;title')
  186. h('i=live:p=alive;')
  187. assert_events(focus=False, live='a,b')
  188. reset()
  189. h(';title')
  190. self.ae(di.notifications, [n()])
  191. activate()
  192. assert_events()
  193. reset()
  194. # test sounds
  195. def enc(x):
  196. return standard_b64encode(x.encode()).decode()
  197. h(f's={enc("silent")};title')
  198. self.ae(di.notifications, [n(sound='silent')])
  199. h(f's={enc("custom")};title')
  200. self.ae(di.notifications[-1], n(desktop_notification_id=2, sound='custom'))
  201. reset()
  202. # Test querying
  203. h('i=xyz:p=?')
  204. self.assertFalse(di.notifications)
  205. qr = 'a=focus,report:o=always,unfocused,invisible:u=0,1,2:p=title,body,?,close,icon,alive,buttons:c=1:w=1:s=system,silent,error,info,question,warn,warning'
  206. self.ae(ch.responses, [f'99;i=xyz:p=?;{qr}'])
  207. reset()
  208. h('p=?')
  209. self.assertFalse(di.notifications)
  210. self.ae(ch.responses, [f'99;i=0:p=?;{qr}'])
  211. # Test MIME streaming
  212. for padding in (True, False):
  213. for extra in ('a', 'ab', 'abc', 'abcd'):
  214. text = 'some reasonably long text to test MIME streaming with: '
  215. encoded = standard_b64encode(text.encode()).decode()
  216. if not padding:
  217. encoded = encoded.rstrip('=')
  218. for t in encoded:
  219. h(f'i=s:e=1:d=0;{t}')
  220. h(f'i=s:e=1:d=0:p=body;{encoded[:13]}')
  221. h(f'i=s:e=1:d=0:p=body;{encoded[13:]}')
  222. h('i=s')
  223. self.ae(di.notifications, [n(text, text)])
  224. reset()
  225. # Test application name and notification type
  226. def e(x):
  227. return standard_b64encode(x.encode()).decode()
  228. h(f'i=t:d=0:f={e("app")}:t={e("1")};title')
  229. h(f'i=t:t={e("test")}')
  230. self.ae(di.notifications, [n(application_name='app', notification_types=('1', 'test',))])
  231. reset()
  232. # Test timeout
  233. h('w=3;title')
  234. self.ae(di.notifications, [n(timeout=3)])
  235. reset()
  236. # Test Disk Cache
  237. dc = IconDataCache(base_cache_dir=tdir, max_cache_size=4)
  238. cache_dir = dc._ensure_state()
  239. for i in range(5):
  240. dc.add_icon(str(i), str(i).encode())
  241. self.ae(set(dc.keys()), set(map(str, range(1, 5))))
  242. del dc
  243. self.assertFalse(os.path.exists(cache_dir))
  244. # Test icons
  245. def send_with_icon(data='', n='', g=''):
  246. m = ''
  247. if n:
  248. for x in n.split(','):
  249. m += f'n={standard_b64encode(x.encode()).decode()}:'
  250. if g:
  251. m += f'g=({g}:'
  252. h(f'i=9:d=0:{m};title')
  253. h(f'i=9:p=icon;{data}')
  254. dc = nm.icon_data_cache
  255. send_with_icon(n='mycon,ic2')
  256. self.ae(di.notifications, [n(icon_names=('mycon', 'ic2'))])
  257. reset()
  258. send_with_icon(g='gid')
  259. self.ae(di.notifications, [n()])
  260. reset()
  261. send_with_icon(g='gid', data='1')
  262. self.ae(di.notifications, [n(icon_path=dc.hash(b'1'))])
  263. send_with_icon(g='gid', n='moose')
  264. self.ae(di.notifications[-1], n(icon_names=('moose',), icon_path=dc.hash(b'1')))
  265. send_with_icon(g='gid2', data='2')
  266. self.ae(di.notifications[-1], n(icon_path=dc.hash(b'2')))
  267. send_with_icon(data='3')
  268. self.ae(di.notifications[-1], n(icon_path=dc.hash(b'3')))
  269. reset()
  270. class TestNotifications(BaseTest):
  271. def test_desktop_notify(self):
  272. with tempfile.TemporaryDirectory() as tdir:
  273. do_test(self, tdir)