session.lua 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. local uv = require('luv')
  2. local MsgpackRpcStream = require('test.client.msgpack_rpc_stream')
  3. local Session = {}
  4. Session.__index = Session
  5. if package.loaded['jit'] then
  6. -- luajit pcall is already coroutine safe
  7. Session.safe_pcall = pcall
  8. else
  9. Session.safe_pcall = require'coxpcall'.pcall
  10. end
  11. local function resume(co, ...)
  12. local status, result = coroutine.resume(co, ...)
  13. if coroutine.status(co) == 'dead' then
  14. if not status then
  15. error(result)
  16. end
  17. return
  18. end
  19. assert(coroutine.status(co) == 'suspended')
  20. result(co)
  21. end
  22. local function coroutine_exec(func, ...)
  23. local args = {...}
  24. local on_complete
  25. if #args > 0 and type(args[#args]) == 'function' then
  26. -- completion callback
  27. on_complete = table.remove(args)
  28. end
  29. resume(coroutine.create(function()
  30. local status, result, flag = Session.safe_pcall(func, unpack(args))
  31. if on_complete then
  32. coroutine.yield(function()
  33. -- run the completion callback on the main thread
  34. on_complete(status, result, flag)
  35. end)
  36. end
  37. end))
  38. end
  39. function Session.new(stream)
  40. return setmetatable({
  41. _msgpack_rpc_stream = MsgpackRpcStream.new(stream),
  42. _pending_messages = {},
  43. _prepare = uv.new_prepare(),
  44. _timer = uv.new_timer(),
  45. _is_running = false
  46. }, Session)
  47. end
  48. function Session:next_message(timeout)
  49. local function on_request(method, args, response)
  50. table.insert(self._pending_messages, {'request', method, args, response})
  51. uv.stop()
  52. end
  53. local function on_notification(method, args)
  54. table.insert(self._pending_messages, {'notification', method, args})
  55. uv.stop()
  56. end
  57. if self._is_running then
  58. error('Event loop already running')
  59. end
  60. if #self._pending_messages > 0 then
  61. return table.remove(self._pending_messages, 1)
  62. end
  63. self:_run(on_request, on_notification, timeout)
  64. return table.remove(self._pending_messages, 1)
  65. end
  66. function Session:notify(method, ...)
  67. self._msgpack_rpc_stream:write(method, {...})
  68. end
  69. function Session:request(method, ...)
  70. local args = {...}
  71. local err, result
  72. if self._is_running then
  73. err, result = self:_yielding_request(method, args)
  74. else
  75. err, result = self:_blocking_request(method, args)
  76. end
  77. if err then
  78. return false, err
  79. end
  80. return true, result
  81. end
  82. function Session:run(request_cb, notification_cb, setup_cb, timeout)
  83. local function on_request(method, args, response)
  84. coroutine_exec(request_cb, method, args, function(status, result, flag)
  85. if status then
  86. response:send(result, flag)
  87. else
  88. response:send(result, true)
  89. end
  90. end)
  91. end
  92. local function on_notification(method, args)
  93. coroutine_exec(notification_cb, method, args)
  94. end
  95. self._is_running = true
  96. if setup_cb then
  97. coroutine_exec(setup_cb)
  98. end
  99. while #self._pending_messages > 0 do
  100. local msg = table.remove(self._pending_messages, 1)
  101. if msg[1] == 'request' then
  102. on_request(msg[2], msg[3], msg[4])
  103. else
  104. on_notification(msg[2], msg[3])
  105. end
  106. end
  107. self:_run(on_request, on_notification, timeout)
  108. self._is_running = false
  109. end
  110. function Session:stop()
  111. uv.stop()
  112. end
  113. function Session:close(signal)
  114. if not self._timer:is_closing() then self._timer:close() end
  115. if not self._prepare:is_closing() then self._prepare:close() end
  116. self._msgpack_rpc_stream:close(signal)
  117. end
  118. function Session:_yielding_request(method, args)
  119. return coroutine.yield(function(co)
  120. self._msgpack_rpc_stream:write(method, args, function(err, result)
  121. resume(co, err, result)
  122. end)
  123. end)
  124. end
  125. function Session:_blocking_request(method, args)
  126. local err, result
  127. local function on_request(method_, args_, response)
  128. table.insert(self._pending_messages, {'request', method_, args_, response})
  129. end
  130. local function on_notification(method_, args_)
  131. table.insert(self._pending_messages, {'notification', method_, args_})
  132. end
  133. self._msgpack_rpc_stream:write(method, args, function(e, r)
  134. err = e
  135. result = r
  136. uv.stop()
  137. end)
  138. self:_run(on_request, on_notification)
  139. return (err or self.eof_err), result
  140. end
  141. function Session:_run(request_cb, notification_cb, timeout)
  142. if type(timeout) == 'number' then
  143. self._prepare:start(function()
  144. self._timer:start(timeout, 0, function()
  145. uv.stop()
  146. end)
  147. self._prepare:stop()
  148. end)
  149. end
  150. self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function()
  151. uv.stop()
  152. self.eof_err = {1, "EOF was received from Nvim. Likely the Nvim process crashed."}
  153. end)
  154. uv.run()
  155. self._prepare:stop()
  156. self._timer:stop()
  157. self._msgpack_rpc_stream:read_stop()
  158. end
  159. return Session