manager.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package datagramsession
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/rs/zerolog"
  10. "github.com/cloudflare/cloudflared/management"
  11. "github.com/cloudflare/cloudflared/packet"
  12. )
  13. const (
  14. requestChanCapacity = 16
  15. defaultReqTimeout = time.Second * 5
  16. )
  17. var (
  18. errSessionManagerClosed = fmt.Errorf("session manager closed")
  19. LogFieldSessionID = "sessionID"
  20. )
  21. func FormatSessionID(sessionID uuid.UUID) string {
  22. sessionIDStr := sessionID.String()
  23. sessionIDStr = strings.ReplaceAll(sessionIDStr, "-", "")
  24. return sessionIDStr
  25. }
  26. // Manager defines the APIs to manage sessions from the same transport.
  27. type Manager interface {
  28. // Serve starts the event loop
  29. Serve(ctx context.Context) error
  30. // RegisterSession starts tracking a session. Caller is responsible for starting the session
  31. RegisterSession(ctx context.Context, sessionID uuid.UUID, dstConn io.ReadWriteCloser) (*Session, error)
  32. // UnregisterSession stops tracking the session and terminates it
  33. UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error
  34. // UpdateLogger updates the logger used by the Manager
  35. UpdateLogger(log *zerolog.Logger)
  36. }
  37. type manager struct {
  38. registrationChan chan *registerSessionEvent
  39. unregistrationChan chan *unregisterSessionEvent
  40. sendFunc transportSender
  41. receiveChan <-chan *packet.Session
  42. closedChan <-chan struct{}
  43. sessions map[uuid.UUID]*Session
  44. log *zerolog.Logger
  45. // timeout waiting for an API to finish. This can be overriden in test
  46. timeout time.Duration
  47. }
  48. func NewManager(log *zerolog.Logger, sendF transportSender, receiveChan <-chan *packet.Session) *manager {
  49. return &manager{
  50. registrationChan: make(chan *registerSessionEvent),
  51. unregistrationChan: make(chan *unregisterSessionEvent),
  52. sendFunc: sendF,
  53. receiveChan: receiveChan,
  54. closedChan: make(chan struct{}),
  55. sessions: make(map[uuid.UUID]*Session),
  56. log: log,
  57. timeout: defaultReqTimeout,
  58. }
  59. }
  60. func (m *manager) UpdateLogger(log *zerolog.Logger) {
  61. // Benign data race, no problem if the old pointer is read or not concurrently.
  62. m.log = log
  63. }
  64. func (m *manager) Serve(ctx context.Context) error {
  65. for {
  66. select {
  67. case <-ctx.Done():
  68. m.shutdownSessions(ctx.Err())
  69. return ctx.Err()
  70. // receiveChan is buffered, so the transport can read more datagrams from transport while the event loop is
  71. // processing other events
  72. case datagram := <-m.receiveChan:
  73. m.sendToSession(datagram)
  74. case registration := <-m.registrationChan:
  75. m.registerSession(ctx, registration)
  76. case unregistration := <-m.unregistrationChan:
  77. m.unregisterSession(unregistration)
  78. }
  79. }
  80. }
  81. func (m *manager) shutdownSessions(err error) {
  82. if err == nil {
  83. err = errSessionManagerClosed
  84. }
  85. closeSessionErr := &errClosedSession{
  86. message: err.Error(),
  87. // Usually connection with remote has been closed, so set this to true to skip unregistering from remote
  88. byRemote: true,
  89. }
  90. for _, s := range m.sessions {
  91. m.unregisterSession(&unregisterSessionEvent{
  92. sessionID: s.ID,
  93. err: closeSessionErr,
  94. })
  95. }
  96. }
  97. func (m *manager) RegisterSession(ctx context.Context, sessionID uuid.UUID, originProxy io.ReadWriteCloser) (*Session, error) {
  98. ctx, cancel := context.WithTimeout(ctx, m.timeout)
  99. defer cancel()
  100. event := newRegisterSessionEvent(sessionID, originProxy)
  101. select {
  102. case <-ctx.Done():
  103. m.log.Error().Msg("Datagram session registration timeout")
  104. return nil, ctx.Err()
  105. case m.registrationChan <- event:
  106. session := <-event.resultChan
  107. return session, nil
  108. // Once closedChan is closed, manager won't accept more registration because nothing is
  109. // reading from registrationChan and it's an unbuffered channel
  110. case <-m.closedChan:
  111. return nil, errSessionManagerClosed
  112. }
  113. }
  114. func (m *manager) registerSession(ctx context.Context, registration *registerSessionEvent) {
  115. session := m.newSession(registration.sessionID, registration.originProxy)
  116. m.sessions[registration.sessionID] = session
  117. registration.resultChan <- session
  118. incrementUDPSessions()
  119. }
  120. func (m *manager) newSession(id uuid.UUID, dstConn io.ReadWriteCloser) *Session {
  121. logger := m.log.With().
  122. Int(management.EventTypeKey, int(management.UDP)).
  123. Str(LogFieldSessionID, FormatSessionID(id)).Logger()
  124. return &Session{
  125. ID: id,
  126. sendFunc: m.sendFunc,
  127. dstConn: dstConn,
  128. // activeAtChan has low capacity. It can be full when there are many concurrent read/write. markActive() will
  129. // drop instead of blocking because last active time only needs to be an approximation
  130. activeAtChan: make(chan time.Time, 2),
  131. // capacity is 2 because close() and dstToTransport routine in Serve() can write to this channel
  132. closeChan: make(chan error, 2),
  133. log: &logger,
  134. }
  135. }
  136. func (m *manager) UnregisterSession(ctx context.Context, sessionID uuid.UUID, message string, byRemote bool) error {
  137. ctx, cancel := context.WithTimeout(ctx, m.timeout)
  138. defer cancel()
  139. event := &unregisterSessionEvent{
  140. sessionID: sessionID,
  141. err: &errClosedSession{
  142. message: message,
  143. byRemote: byRemote,
  144. },
  145. }
  146. select {
  147. case <-ctx.Done():
  148. m.log.Error().Msg("Datagram session unregistration timeout")
  149. return ctx.Err()
  150. case m.unregistrationChan <- event:
  151. return nil
  152. case <-m.closedChan:
  153. return errSessionManagerClosed
  154. }
  155. }
  156. func (m *manager) unregisterSession(unregistration *unregisterSessionEvent) {
  157. session, ok := m.sessions[unregistration.sessionID]
  158. if ok {
  159. delete(m.sessions, unregistration.sessionID)
  160. session.close(unregistration.err)
  161. decrementUDPActiveSessions()
  162. }
  163. }
  164. func (m *manager) sendToSession(datagram *packet.Session) {
  165. session, ok := m.sessions[datagram.ID]
  166. if !ok {
  167. m.log.Error().Str(LogFieldSessionID, FormatSessionID(datagram.ID)).Msg("session not found")
  168. return
  169. }
  170. // session writes to destination over a connected UDP socket, which should not be blocking, so this call doesn't
  171. // need to run in another go routine
  172. session.transportToDst(datagram.Payload)
  173. }