uv_stream.lua 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. local uv = vim.uv
  2. --- @class test.Stream
  3. --- @field write fun(self, data: string|string[])
  4. --- @field read_start fun(self, cb: fun(chunk: string))
  5. --- @field read_stop fun(self)
  6. --- @field close fun(self, signal?: string)
  7. --- @class vim.StdioStream : test.Stream
  8. --- @field private _in uv.uv_pipe_t
  9. --- @field private _out uv.uv_pipe_t
  10. local StdioStream = {}
  11. StdioStream.__index = StdioStream
  12. function StdioStream.open()
  13. local self = setmetatable({
  14. _in = assert(uv.new_pipe(false)),
  15. _out = assert(uv.new_pipe(false)),
  16. }, StdioStream)
  17. self._in:open(0)
  18. self._out:open(1)
  19. return self
  20. end
  21. --- @param data string|string[]
  22. function StdioStream:write(data)
  23. self._out:write(data)
  24. end
  25. function StdioStream:read_start(cb)
  26. self._in:read_start(function(err, chunk)
  27. if err then
  28. error(err)
  29. end
  30. cb(chunk)
  31. end)
  32. end
  33. function StdioStream:read_stop()
  34. self._in:read_stop()
  35. end
  36. function StdioStream:close()
  37. self._in:close()
  38. self._out:close()
  39. end
  40. --- @class test.SocketStream : test.Stream
  41. --- @field package _stream_error? string
  42. --- @field package _socket uv.uv_pipe_t
  43. local SocketStream = {}
  44. SocketStream.__index = SocketStream
  45. function SocketStream.open(file)
  46. local socket = assert(uv.new_pipe(false))
  47. local self = setmetatable({
  48. _socket = socket,
  49. _stream_error = nil,
  50. }, SocketStream)
  51. uv.pipe_connect(socket, file, function(err)
  52. self._stream_error = self._stream_error or err
  53. end)
  54. return self
  55. end
  56. function SocketStream.connect(host, port)
  57. local socket = assert(uv.new_tcp())
  58. local self = setmetatable({
  59. _socket = socket,
  60. _stream_error = nil,
  61. }, SocketStream)
  62. uv.tcp_connect(socket, host, port, function(err)
  63. self._stream_error = self._stream_error or err
  64. end)
  65. return self
  66. end
  67. function SocketStream:write(data)
  68. if self._stream_error then
  69. error(self._stream_error)
  70. end
  71. uv.write(self._socket, data, function(err)
  72. if err then
  73. error(self._stream_error or err)
  74. end
  75. end)
  76. end
  77. function SocketStream:read_start(cb)
  78. if self._stream_error then
  79. error(self._stream_error)
  80. end
  81. uv.read_start(self._socket, function(err, chunk)
  82. if err then
  83. error(err)
  84. end
  85. cb(chunk)
  86. end)
  87. end
  88. function SocketStream:read_stop()
  89. if self._stream_error then
  90. error(self._stream_error)
  91. end
  92. uv.read_stop(self._socket)
  93. end
  94. function SocketStream:close()
  95. uv.close(self._socket)
  96. end
  97. --- @class test.ChildProcessStream : test.Stream
  98. --- @field private _proc uv.uv_process_t
  99. --- @field private _pid integer
  100. --- @field private _child_stdin uv.uv_pipe_t
  101. --- @field private _child_stdout uv.uv_pipe_t
  102. --- @field status integer
  103. --- @field signal integer
  104. local ChildProcessStream = {}
  105. ChildProcessStream.__index = ChildProcessStream
  106. --- @param argv string[]
  107. --- @param env string[]?
  108. --- @param io_extra uv.uv_pipe_t?
  109. --- @return test.ChildProcessStream
  110. function ChildProcessStream.spawn(argv, env, io_extra)
  111. local self = setmetatable({
  112. _child_stdin = uv.new_pipe(false),
  113. _child_stdout = uv.new_pipe(false),
  114. _exiting = false,
  115. }, ChildProcessStream)
  116. local prog = argv[1]
  117. local args = {} --- @type string[]
  118. for i = 2, #argv do
  119. args[#args + 1] = argv[i]
  120. end
  121. --- @diagnostic disable-next-line:missing-fields
  122. self._proc, self._pid = uv.spawn(prog, {
  123. stdio = { self._child_stdin, self._child_stdout, 1, io_extra },
  124. args = args,
  125. --- @diagnostic disable-next-line:assign-type-mismatch
  126. env = env,
  127. }, function(status, signal)
  128. self.status = status
  129. self.signal = signal
  130. end)
  131. if not self._proc then
  132. local err = self._pid
  133. error(err)
  134. end
  135. return self
  136. end
  137. function ChildProcessStream:write(data)
  138. self._child_stdin:write(data)
  139. end
  140. function ChildProcessStream:read_start(cb)
  141. self._child_stdout:read_start(function(err, chunk)
  142. if err then
  143. error(err)
  144. end
  145. cb(chunk)
  146. end)
  147. end
  148. function ChildProcessStream:read_stop()
  149. self._child_stdout:read_stop()
  150. end
  151. function ChildProcessStream:close(signal)
  152. if self._closed then
  153. return
  154. end
  155. self._closed = true
  156. self:read_stop()
  157. self._child_stdin:close()
  158. self._child_stdout:close()
  159. if type(signal) == 'string' then
  160. self._proc:kill('sig' .. signal)
  161. end
  162. while self.status == nil do
  163. uv.run 'once'
  164. end
  165. return self.status, self.signal
  166. end
  167. return {
  168. StdioStream = StdioStream,
  169. ChildProcessStream = ChildProcessStream,
  170. SocketStream = SocketStream,
  171. }