session.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package datagramsession
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "time"
  9. "github.com/google/uuid"
  10. "github.com/rs/zerolog"
  11. "github.com/cloudflare/cloudflared/packet"
  12. )
  13. const (
  14. defaultCloseIdleAfter = time.Second * 210
  15. )
  16. func SessionIdleErr(timeout time.Duration) error {
  17. return fmt.Errorf("session idle for %v", timeout)
  18. }
  19. type transportSender func(session *packet.Session) error
  20. // ErrVithVariableSeverity are errors that have variable severity
  21. type ErrVithVariableSeverity interface {
  22. error
  23. // LogLevel return the severity of this error
  24. LogLevel() zerolog.Level
  25. }
  26. // Session is a bidirectional pipe of datagrams between transport and dstConn
  27. // Destination can be a connection with origin or with eyeball
  28. // When the destination is origin:
  29. // - Manager receives datagrams from receiveChan and calls the transportToDst method of the Session to send to origin
  30. // - Datagrams from origin are read from conn and Send to transport using the transportSender callback. Transport will return them to eyeball
  31. // When the destination is eyeball:
  32. // - Datagrams from eyeball are read from conn and Send to transport. Transport will send them to cloudflared using the transportSender callback.
  33. // - Manager receives datagrams from receiveChan and calls the transportToDst method of the Session to send to the eyeball
  34. type Session struct {
  35. ID uuid.UUID
  36. sendFunc transportSender
  37. dstConn io.ReadWriteCloser
  38. // activeAtChan is used to communicate the last read/write time
  39. activeAtChan chan time.Time
  40. closeChan chan error
  41. log *zerolog.Logger
  42. }
  43. func (s *Session) Serve(ctx context.Context, closeAfterIdle time.Duration) (closedByRemote bool, err error) {
  44. go func() {
  45. // QUIC implementation copies data to another buffer before returning https://github.com/quic-go/quic-go/blob/v0.24.0/session.go#L1967-L1975
  46. // This makes it safe to share readBuffer between iterations
  47. const maxPacketSize = 1500
  48. readBuffer := make([]byte, maxPacketSize)
  49. for {
  50. if closeSession, err := s.dstToTransport(readBuffer); err != nil {
  51. if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
  52. s.log.Debug().Msg("Destination connection closed")
  53. } else {
  54. level := zerolog.ErrorLevel
  55. if variableErr, ok := err.(ErrVithVariableSeverity); ok {
  56. level = variableErr.LogLevel()
  57. }
  58. s.log.WithLevel(level).Err(err).Msg("Failed to send session payload from destination to transport")
  59. }
  60. if closeSession {
  61. s.closeChan <- err
  62. return
  63. }
  64. }
  65. }
  66. }()
  67. err = s.waitForCloseCondition(ctx, closeAfterIdle)
  68. if closeSession, ok := err.(*errClosedSession); ok {
  69. closedByRemote = closeSession.byRemote
  70. }
  71. return closedByRemote, err
  72. }
  73. func (s *Session) waitForCloseCondition(ctx context.Context, closeAfterIdle time.Duration) error {
  74. // Closing dstConn cancels read so dstToTransport routine in Serve() can return
  75. defer s.dstConn.Close()
  76. if closeAfterIdle == 0 {
  77. // provide deafult is caller doesn't specify one
  78. closeAfterIdle = defaultCloseIdleAfter
  79. }
  80. checkIdleFreq := closeAfterIdle / 8
  81. checkIdleTicker := time.NewTicker(checkIdleFreq)
  82. defer checkIdleTicker.Stop()
  83. activeAt := time.Now()
  84. for {
  85. select {
  86. case <-ctx.Done():
  87. return ctx.Err()
  88. case reason := <-s.closeChan:
  89. return reason
  90. // TODO: TUN-5423 evaluate if using atomic is more efficient
  91. case now := <-checkIdleTicker.C:
  92. // The session is considered inactive if current time is after (last active time + allowed idle time)
  93. if now.After(activeAt.Add(closeAfterIdle)) {
  94. return SessionIdleErr(closeAfterIdle)
  95. }
  96. case activeAt = <-s.activeAtChan: // Update last active time
  97. }
  98. }
  99. }
  100. func (s *Session) dstToTransport(buffer []byte) (closeSession bool, err error) {
  101. n, err := s.dstConn.Read(buffer)
  102. s.markActive()
  103. // https://pkg.go.dev/io#Reader suggests caller should always process n > 0 bytes
  104. if n > 0 || err == nil {
  105. session := packet.Session{
  106. ID: s.ID,
  107. Payload: buffer[:n],
  108. }
  109. if sendErr := s.sendFunc(&session); sendErr != nil {
  110. return false, sendErr
  111. }
  112. }
  113. return err != nil, err
  114. }
  115. func (s *Session) transportToDst(payload []byte) (int, error) {
  116. s.markActive()
  117. n, err := s.dstConn.Write(payload)
  118. if err != nil {
  119. s.log.Err(err).Msg("Failed to write payload to session")
  120. }
  121. return n, err
  122. }
  123. // Sends the last active time to the idle checker loop without blocking. activeAtChan will only be full when there
  124. // are many concurrent read/write. It is fine to lose some precision
  125. func (s *Session) markActive() {
  126. select {
  127. case s.activeAtChan <- time.Now():
  128. default:
  129. }
  130. }
  131. func (s *Session) close(err *errClosedSession) {
  132. s.closeChan <- err
  133. }