websocketconn.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package websocketconn
  2. import (
  3. "io"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. )
  7. // An abstraction that makes an underlying WebSocket connection look like a
  8. // net.Conn.
  9. type Conn struct {
  10. *websocket.Conn
  11. Reader io.Reader
  12. Writer io.Writer
  13. }
  14. func (conn *Conn) Read(b []byte) (n int, err error) {
  15. return conn.Reader.Read(b)
  16. }
  17. func (conn *Conn) Write(b []byte) (n int, err error) {
  18. return conn.Writer.Write(b)
  19. }
  20. func (conn *Conn) Close() error {
  21. conn.Reader.(*io.PipeReader).Close()
  22. conn.Writer.(*io.PipeWriter).Close()
  23. // Ignore any error in trying to write a Close frame.
  24. _ = conn.Conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
  25. return conn.Conn.Close()
  26. }
  27. func (conn *Conn) SetDeadline(t time.Time) error {
  28. errRead := conn.Conn.SetReadDeadline(t)
  29. errWrite := conn.Conn.SetWriteDeadline(t)
  30. err := errRead
  31. if err == nil {
  32. err = errWrite
  33. }
  34. return err
  35. }
  36. func readLoop(w io.Writer, ws *websocket.Conn) error {
  37. for {
  38. messageType, r, err := ws.NextReader()
  39. if err != nil {
  40. return err
  41. }
  42. if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
  43. continue
  44. }
  45. _, err = io.Copy(w, r)
  46. if err != nil {
  47. return err
  48. }
  49. }
  50. }
  51. func writeLoop(ws *websocket.Conn, r io.Reader) error {
  52. for {
  53. var buf [2048]byte
  54. n, err := r.Read(buf[:])
  55. if err != nil {
  56. return err
  57. }
  58. data := buf[:n]
  59. w, err := ws.NextWriter(websocket.BinaryMessage)
  60. if err != nil {
  61. return err
  62. }
  63. n, err = w.Write(data)
  64. if err != nil {
  65. return err
  66. }
  67. err = w.Close()
  68. if err != nil {
  69. return err
  70. }
  71. }
  72. }
  73. // websocket.Conn methods start returning websocket.CloseError after the
  74. // connection has been closed. We want to instead interpret that as io.EOF, just
  75. // as you would find with a normal net.Conn. This only converts
  76. // websocket.CloseErrors with known codes; other codes like CloseProtocolError
  77. // and CloseAbnormalClosure will still be reported as anomalous.
  78. func closeErrorToEOF(err error) error {
  79. if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
  80. err = io.EOF
  81. }
  82. return err
  83. }
  84. // Create a new Conn.
  85. func New(ws *websocket.Conn) *Conn {
  86. // Set up synchronous pipes to serialize reads and writes to the
  87. // underlying websocket.Conn.
  88. //
  89. // https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
  90. // "Connections support one concurrent reader and one concurrent writer.
  91. // Applications are responsible for ensuring that no more than one
  92. // goroutine calls the write methods (NextWriter, etc.) concurrently and
  93. // that no more than one goroutine calls the read methods (NextReader,
  94. // etc.) concurrently. The Close and WriteControl methods can be called
  95. // concurrently with all other methods."
  96. pr1, pw1 := io.Pipe()
  97. go func() {
  98. pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws)))
  99. }()
  100. pr2, pw2 := io.Pipe()
  101. go func() {
  102. pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2)))
  103. }()
  104. return &Conn{
  105. Conn: ws,
  106. Reader: pr1,
  107. Writer: pw2,
  108. }
  109. }