quic_datagram_v2.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package connection
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "time"
  7. "github.com/google/uuid"
  8. "github.com/quic-go/quic-go"
  9. "github.com/rs/zerolog"
  10. "go.opentelemetry.io/otel/attribute"
  11. "go.opentelemetry.io/otel/trace"
  12. "golang.org/x/sync/errgroup"
  13. "github.com/cloudflare/cloudflared/datagramsession"
  14. "github.com/cloudflare/cloudflared/ingress"
  15. "github.com/cloudflare/cloudflared/management"
  16. "github.com/cloudflare/cloudflared/packet"
  17. cfdquic "github.com/cloudflare/cloudflared/quic"
  18. "github.com/cloudflare/cloudflared/tracing"
  19. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  20. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  21. rpcquic "github.com/cloudflare/cloudflared/tunnelrpc/quic"
  22. )
  23. const (
  24. // emperically this capacity has been working well
  25. demuxChanCapacity = 16
  26. )
  27. // DatagramSessionHandler is a service that can serve datagrams for a connection and handle sessions from incoming
  28. // connection streams.
  29. type DatagramSessionHandler interface {
  30. Serve(context.Context) error
  31. pogs.SessionManager
  32. }
  33. type datagramV2Connection struct {
  34. conn quic.Connection
  35. // sessionManager tracks active sessions. It receives datagrams from quic connection via datagramMuxer
  36. sessionManager datagramsession.Manager
  37. // datagramMuxer mux/demux datagrams from quic connection
  38. datagramMuxer *cfdquic.DatagramMuxerV2
  39. packetRouter *ingress.PacketRouter
  40. rpcTimeout time.Duration
  41. streamWriteTimeout time.Duration
  42. logger *zerolog.Logger
  43. }
  44. func NewDatagramV2Connection(ctx context.Context,
  45. conn quic.Connection,
  46. packetConfig *ingress.GlobalRouterConfig,
  47. rpcTimeout time.Duration,
  48. streamWriteTimeout time.Duration,
  49. logger *zerolog.Logger,
  50. ) DatagramSessionHandler {
  51. sessionDemuxChan := make(chan *packet.Session, demuxChanCapacity)
  52. datagramMuxer := cfdquic.NewDatagramMuxerV2(conn, logger, sessionDemuxChan)
  53. sessionManager := datagramsession.NewManager(logger, datagramMuxer.SendToSession, sessionDemuxChan)
  54. packetRouter := ingress.NewPacketRouter(packetConfig, datagramMuxer, logger)
  55. return &datagramV2Connection{
  56. conn,
  57. sessionManager,
  58. datagramMuxer,
  59. packetRouter,
  60. rpcTimeout,
  61. streamWriteTimeout,
  62. logger,
  63. }
  64. }
  65. func (d *datagramV2Connection) Serve(ctx context.Context) error {
  66. // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
  67. // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
  68. // connection).
  69. // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
  70. // other goroutine as fast as possible.
  71. ctx, cancel := context.WithCancel(ctx)
  72. errGroup, ctx := errgroup.WithContext(ctx)
  73. errGroup.Go(func() error {
  74. defer cancel()
  75. return d.sessionManager.Serve(ctx)
  76. })
  77. errGroup.Go(func() error {
  78. defer cancel()
  79. return d.datagramMuxer.ServeReceive(ctx)
  80. })
  81. errGroup.Go(func() error {
  82. defer cancel()
  83. return d.packetRouter.Serve(ctx)
  84. })
  85. return errGroup.Wait()
  86. }
  87. // RegisterUdpSession is the RPC method invoked by edge to register and run a session
  88. func (q *datagramV2Connection) RegisterUdpSession(ctx context.Context, sessionID uuid.UUID, dstIP net.IP, dstPort uint16, closeAfterIdleHint time.Duration, traceContext string) (*tunnelpogs.RegisterUdpSessionResponse, error) {
  89. traceCtx := tracing.NewTracedContext(ctx, traceContext, q.logger)
  90. ctx, registerSpan := traceCtx.Tracer().Start(traceCtx, "register-session", trace.WithAttributes(
  91. attribute.String("session-id", sessionID.String()),
  92. attribute.String("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)),
  93. ))
  94. log := q.logger.With().Int(management.EventTypeKey, int(management.UDP)).Logger()
  95. // Each session is a series of datagram from an eyeball to a dstIP:dstPort.
  96. // (src port, dst IP, dst port) uniquely identifies a session, so it needs a dedicated connected socket.
  97. originProxy, err := ingress.DialUDP(dstIP, dstPort)
  98. if err != nil {
  99. log.Err(err).Msgf("Failed to create udp proxy to %s:%d", dstIP, dstPort)
  100. tracing.EndWithErrorStatus(registerSpan, err)
  101. return nil, err
  102. }
  103. registerSpan.SetAttributes(
  104. attribute.Bool("socket-bind-success", true),
  105. attribute.String("src", originProxy.LocalAddr().String()),
  106. )
  107. session, err := q.sessionManager.RegisterSession(ctx, sessionID, originProxy)
  108. if err != nil {
  109. originProxy.Close()
  110. log.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).Msgf("Failed to register udp session")
  111. tracing.EndWithErrorStatus(registerSpan, err)
  112. return nil, err
  113. }
  114. go q.serveUDPSession(session, closeAfterIdleHint)
  115. log.Debug().
  116. Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
  117. Str("src", originProxy.LocalAddr().String()).
  118. Str("dst", fmt.Sprintf("%s:%d", dstIP, dstPort)).
  119. Msgf("Registered session")
  120. tracing.End(registerSpan)
  121. resp := tunnelpogs.RegisterUdpSessionResponse{
  122. Spans: traceCtx.GetProtoSpans(),
  123. }
  124. return &resp, nil
  125. }
  126. // UnregisterUdpSession is the RPC method invoked by edge to unregister and terminate a sesssion
  127. func (q *datagramV2Connection) UnregisterUdpSession(ctx context.Context, sessionID uuid.UUID, message string) error {
  128. return q.sessionManager.UnregisterSession(ctx, sessionID, message, true)
  129. }
  130. func (q *datagramV2Connection) serveUDPSession(session *datagramsession.Session, closeAfterIdleHint time.Duration) {
  131. ctx := q.conn.Context()
  132. closedByRemote, err := session.Serve(ctx, closeAfterIdleHint)
  133. // If session is terminated by remote, then we know it has been unregistered from session manager and edge
  134. if !closedByRemote {
  135. if err != nil {
  136. q.closeUDPSession(ctx, session.ID, err.Error())
  137. } else {
  138. q.closeUDPSession(ctx, session.ID, "terminated without error")
  139. }
  140. }
  141. q.logger.Debug().Err(err).
  142. Int(management.EventTypeKey, int(management.UDP)).
  143. Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(session.ID)).
  144. Msg("Session terminated")
  145. }
  146. // closeUDPSession first unregisters the session from session manager, then it tries to unregister from edge
  147. func (q *datagramV2Connection) closeUDPSession(ctx context.Context, sessionID uuid.UUID, message string) {
  148. q.sessionManager.UnregisterSession(ctx, sessionID, message, false)
  149. quicStream, err := q.conn.OpenStream()
  150. if err != nil {
  151. // Log this at debug because this is not an error if session was closed due to lost connection
  152. // with edge
  153. q.logger.Debug().Err(err).
  154. Int(management.EventTypeKey, int(management.UDP)).
  155. Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
  156. Msgf("Failed to open quic stream to unregister udp session with edge")
  157. return
  158. }
  159. stream := cfdquic.NewSafeStreamCloser(quicStream, q.streamWriteTimeout, q.logger)
  160. defer stream.Close()
  161. rpcClientStream, err := rpcquic.NewSessionClient(ctx, stream, q.rpcTimeout)
  162. if err != nil {
  163. // Log this at debug because this is not an error if session was closed due to lost connection
  164. // with edge
  165. q.logger.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
  166. Msgf("Failed to open rpc stream to unregister udp session with edge")
  167. return
  168. }
  169. defer rpcClientStream.Close()
  170. if err := rpcClientStream.UnregisterUdpSession(ctx, sessionID, message); err != nil {
  171. q.logger.Err(err).Str(datagramsession.LogFieldSessionID, datagramsession.FormatSessionID(sessionID)).
  172. Msgf("Failed to unregister udp session with edge")
  173. }
  174. }