websocketconn.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package websocketconn
  2. import (
  3. "io"
  4. "time"
  5. "github.com/gorilla/websocket"
  6. )
  7. // An abstraction that makes an underlying WebSocket connection look like a
  8. // net.Conn.
  9. type Conn struct {
  10. *websocket.Conn
  11. Reader io.Reader
  12. Writer io.Writer
  13. }
  14. func (conn *Conn) Read(b []byte) (n int, err error) {
  15. return conn.Reader.Read(b)
  16. }
  17. func (conn *Conn) Write(b []byte) (n int, err error) {
  18. return conn.Writer.Write(b)
  19. }
  20. func (conn *Conn) Close() error {
  21. conn.Reader.(*io.PipeReader).Close()
  22. conn.Writer.(*io.PipeWriter).Close()
  23. // Ignore any error in trying to write a Close frame.
  24. _ = conn.Conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
  25. return conn.Conn.Close()
  26. }
  27. func (conn *Conn) SetDeadline(t time.Time) error {
  28. errRead := conn.Conn.SetReadDeadline(t)
  29. errWrite := conn.Conn.SetWriteDeadline(t)
  30. err := errRead
  31. if err == nil {
  32. err = errWrite
  33. }
  34. return err
  35. }
  36. func readLoop(w io.Writer, ws *websocket.Conn) error {
  37. var buf [2048]byte
  38. for {
  39. messageType, r, err := ws.NextReader()
  40. if err != nil {
  41. return err
  42. }
  43. if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
  44. continue
  45. }
  46. _, err = io.CopyBuffer(w, r, buf[:])
  47. if err != nil {
  48. return err
  49. }
  50. }
  51. }
  52. func writeLoop(ws *websocket.Conn, r io.Reader) error {
  53. var buf [2048]byte
  54. for {
  55. n, err := r.Read(buf[:])
  56. if err != nil {
  57. return err
  58. }
  59. err = ws.WriteMessage(websocket.BinaryMessage, buf[:n])
  60. if err != nil {
  61. return err
  62. }
  63. }
  64. }
  65. // websocket.Conn methods start returning websocket.CloseError after the
  66. // connection has been closed. We want to instead interpret that as io.EOF, just
  67. // as you would find with a normal net.Conn. This only converts
  68. // websocket.CloseErrors with known codes; other codes like CloseProtocolError
  69. // and CloseAbnormalClosure will still be reported as anomalous.
  70. func closeErrorToEOF(err error) error {
  71. if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
  72. err = io.EOF
  73. }
  74. return err
  75. }
  76. // Create a new Conn.
  77. func New(ws *websocket.Conn) *Conn {
  78. // Set up synchronous pipes to serialize reads and writes to the
  79. // underlying websocket.Conn.
  80. //
  81. // https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
  82. // "Connections support one concurrent reader and one concurrent writer.
  83. // Applications are responsible for ensuring that no more than one
  84. // goroutine calls the write methods (WriteMessage, etc.) concurrently
  85. // and that no more than one goroutine calls the read methods
  86. // (NextReader, etc.) concurrently. The Close and WriteControl methods
  87. // can be called concurrently with all other methods."
  88. pr1, pw1 := io.Pipe()
  89. go func() {
  90. pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws)))
  91. }()
  92. pr2, pw2 := io.Pipe()
  93. go func() {
  94. pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2)))
  95. }()
  96. return &Conn{
  97. Conn: ws,
  98. Reader: pr1,
  99. Writer: pw2,
  100. }
  101. }