msgpack_rpc_stream.lua 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. local mpack = require('mpack')
  2. -- temporary hack to be able to manipulate buffer/window/tabpage
  3. local Buffer = {}
  4. Buffer.__index = Buffer
  5. function Buffer.new(id) return setmetatable({id=id}, Buffer) end
  6. local Window = {}
  7. Window.__index = Window
  8. function Window.new(id) return setmetatable({id=id}, Window) end
  9. local Tabpage = {}
  10. Tabpage.__index = Tabpage
  11. function Tabpage.new(id) return setmetatable({id=id}, Tabpage) end
  12. local Response = {}
  13. Response.__index = Response
  14. function Response.new(msgpack_rpc_stream, request_id)
  15. return setmetatable({
  16. _msgpack_rpc_stream = msgpack_rpc_stream,
  17. _request_id = request_id
  18. }, Response)
  19. end
  20. function Response:send(value, is_error)
  21. local data = self._msgpack_rpc_stream._session:reply(self._request_id)
  22. if is_error then
  23. data = data .. self._msgpack_rpc_stream._pack(value)
  24. data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
  25. else
  26. data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
  27. data = data .. self._msgpack_rpc_stream._pack(value)
  28. end
  29. self._msgpack_rpc_stream._stream:write(data)
  30. end
  31. local MsgpackRpcStream = {}
  32. MsgpackRpcStream.__index = MsgpackRpcStream
  33. function MsgpackRpcStream.new(stream)
  34. return setmetatable({
  35. _stream = stream,
  36. _pack = mpack.Packer({
  37. ext = {
  38. [Buffer] = function(o) return 0, mpack.encode(o.id) end,
  39. [Window] = function(o) return 1, mpack.encode(o.id) end,
  40. [Tabpage] = function(o) return 2, mpack.encode(o.id) end
  41. }
  42. }),
  43. _session = mpack.Session({
  44. unpack = mpack.Unpacker({
  45. ext = {
  46. [0] = function(_c, s) return Buffer.new(mpack.decode(s)) end,
  47. [1] = function(_c, s) return Window.new(mpack.decode(s)) end,
  48. [2] = function(_c, s) return Tabpage.new(mpack.decode(s)) end
  49. }
  50. })
  51. }),
  52. }, MsgpackRpcStream)
  53. end
  54. function MsgpackRpcStream:write(method, args, response_cb)
  55. local data
  56. if response_cb then
  57. assert(type(response_cb) == 'function')
  58. data = self._session:request(response_cb)
  59. else
  60. data = self._session:notify()
  61. end
  62. data = data .. self._pack(method) .. self._pack(args)
  63. self._stream:write(data)
  64. end
  65. function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
  66. self._stream:read_start(function(data)
  67. if not data then
  68. return eof_cb()
  69. end
  70. local type, id_or_cb, method_or_error, args_or_result
  71. local pos = 1
  72. local len = #data
  73. while pos <= len do
  74. type, id_or_cb, method_or_error, args_or_result, pos =
  75. self._session:receive(data, pos)
  76. if type == 'request' or type == 'notification' then
  77. if type == 'request' then
  78. request_cb(method_or_error, args_or_result, Response.new(self,
  79. id_or_cb))
  80. else
  81. notification_cb(method_or_error, args_or_result)
  82. end
  83. elseif type == 'response' then
  84. if method_or_error == mpack.NIL then
  85. method_or_error = nil
  86. else
  87. args_or_result = nil
  88. end
  89. id_or_cb(method_or_error, args_or_result)
  90. end
  91. end
  92. end)
  93. end
  94. function MsgpackRpcStream:read_stop()
  95. self._stream:read_stop()
  96. end
  97. function MsgpackRpcStream:close(signal)
  98. self._stream:close(signal)
  99. end
  100. return MsgpackRpcStream