socket_io.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // License: GPLv3 Copyright: 2022, Kovid Goyal, <kovid at kovidgoyal.net>
  2. package at
  3. import (
  4. "bytes"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "os"
  10. "strconv"
  11. "time"
  12. "kitty/tools/tui/loop"
  13. "kitty/tools/utils"
  14. "kitty/tools/wcswidth"
  15. )
  16. var _ = fmt.Print
  17. func write_all_to_conn(conn *net.Conn, data []byte) error {
  18. for len(data) > 0 {
  19. n, err := (*conn).Write(data)
  20. if err != nil && errors.Is(err, io.ErrShortWrite) {
  21. err = nil
  22. }
  23. if err != nil {
  24. return err
  25. }
  26. data = data[n:]
  27. }
  28. return nil
  29. }
  30. func write_many_to_conn(conn *net.Conn, datums ...[]byte) error {
  31. for len(datums) > 0 {
  32. err := write_all_to_conn(conn, datums[0])
  33. if err != nil {
  34. return err
  35. }
  36. datums = datums[1:]
  37. }
  38. return nil
  39. }
  40. type response_reader struct {
  41. parser wcswidth.EscapeCodeParser
  42. storage [utils.DEFAULT_IO_BUFFER_SIZE]byte
  43. pending_responses [][]byte
  44. }
  45. func (r *response_reader) read_response_from_conn(conn *net.Conn, timeout time.Duration) (serialized_response []byte, err error) {
  46. keep_going := true
  47. if len(r.pending_responses) == 0 {
  48. r.parser.HandleDCS = func(data []byte) error {
  49. if bytes.HasPrefix(data, []byte("@kitty-cmd")) {
  50. r.pending_responses = append(r.pending_responses, append([]byte{}, data[len("@kitty-cmd"):]...))
  51. keep_going = false
  52. }
  53. return nil
  54. }
  55. buf := r.storage[:]
  56. for keep_going {
  57. var n int
  58. (*conn).SetDeadline(time.Now().Add(timeout))
  59. n, err = (*conn).Read(buf)
  60. if err != nil {
  61. keep_going = false
  62. break
  63. }
  64. r.parser.Parse(buf[:n])
  65. }
  66. }
  67. if len(r.pending_responses) > 0 {
  68. serialized_response = r.pending_responses[0]
  69. r.pending_responses = r.pending_responses[1:]
  70. }
  71. return
  72. }
  73. const cmd_escape_code_prefix = "\x1bP@kitty-cmd"
  74. const cmd_escape_code_suffix = "\x1b\\"
  75. func run_stdin_echo_loop(conn *net.Conn, io_data *rc_io_data) (err error) {
  76. lp, err := loop.New(loop.NoAlternateScreen, loop.NoRestoreColors)
  77. if err != nil {
  78. return
  79. }
  80. lp.OnKeyEvent = func(event *loop.KeyEvent) error {
  81. event.Handled = true
  82. err = io_data.on_key_event(lp, event)
  83. if err != nil {
  84. if err == end_reading_from_stdin {
  85. lp.Quit(0)
  86. return nil
  87. }
  88. return err
  89. }
  90. chunk, err := io_data.next_chunk()
  91. if err != nil {
  92. if err == waiting_on_stdin {
  93. return nil
  94. }
  95. return err
  96. }
  97. err = write_many_to_conn(conn, []byte(cmd_escape_code_prefix), chunk, []byte(cmd_escape_code_suffix))
  98. if err != nil {
  99. return err
  100. }
  101. return nil
  102. }
  103. err = lp.Run()
  104. if err == nil {
  105. lp.KillIfSignalled()
  106. }
  107. return err
  108. }
  109. func simple_socket_io(conn *net.Conn, io_data *rc_io_data) (serialized_response []byte, err error) {
  110. r := response_reader{}
  111. r.pending_responses = make([][]byte, 0, 2) // we read at most two responses
  112. first_escape_code_sent := false
  113. wants_streaming := io_data.rc.Stream
  114. for {
  115. var chunk []byte
  116. chunk, err = io_data.next_chunk()
  117. if err != nil {
  118. if err == waiting_on_stdin {
  119. err := run_stdin_echo_loop(conn, io_data)
  120. return make([]byte, 0), err
  121. }
  122. return
  123. }
  124. if len(chunk) == 0 {
  125. break
  126. }
  127. err = write_many_to_conn(conn, []byte(cmd_escape_code_prefix), chunk, []byte(cmd_escape_code_suffix))
  128. if err != nil {
  129. return
  130. }
  131. if !first_escape_code_sent {
  132. first_escape_code_sent = true
  133. if wants_streaming {
  134. var streaming_response []byte
  135. streaming_response, err = r.read_response_from_conn(conn, io_data.timeout)
  136. if err != nil {
  137. return
  138. }
  139. if !is_stream_response(streaming_response) {
  140. err = fmt.Errorf("Did not receive expected streaming response")
  141. return
  142. }
  143. }
  144. }
  145. }
  146. if io_data.rc.NoResponse {
  147. return
  148. }
  149. return r.read_response_from_conn(conn, io_data.timeout)
  150. }
  151. func do_socket_io(io_data *rc_io_data) (serialized_response []byte, err error) {
  152. var conn net.Conn
  153. if global_options.to_network == "fd" {
  154. fd, _ := strconv.Atoi(global_options.to_address)
  155. if err != nil {
  156. return nil, err
  157. }
  158. f := os.NewFile(uintptr(fd), "fd:"+global_options.to_address)
  159. conn, err = net.FileConn(f)
  160. if err != nil {
  161. return nil, fmt.Errorf("Failed to open a socket for the remote control file descriptor: %d with error: %w", fd, err)
  162. }
  163. defer f.Close()
  164. } else {
  165. network := utils.IfElse(global_options.to_network == "ip", "tcp", global_options.to_network)
  166. conn, err = net.Dial(network, global_options.to_address)
  167. if err != nil {
  168. err = fmt.Errorf("Failed to connect to %s %s with error: %w", network, global_options.to_address, err)
  169. return
  170. }
  171. }
  172. defer conn.Close()
  173. return simple_socket_io(&conn, io_data)
  174. }