datagram.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/google/uuid"
  6. "github.com/pkg/errors"
  7. "github.com/quic-go/quic-go"
  8. "github.com/rs/zerolog"
  9. "github.com/cloudflare/cloudflared/packet"
  10. )
  11. const (
  12. sessionIDLen = len(uuid.UUID{})
  13. )
  14. type BaseDatagramMuxer interface {
  15. // SendToSession suffix the session ID to the payload so the other end of the QUIC connection can demultiplex the
  16. // payload from multiple datagram sessions.
  17. SendToSession(session *packet.Session) error
  18. // ServeReceive starts a loop to receive datagrams from the QUIC connection
  19. ServeReceive(ctx context.Context) error
  20. }
  21. type DatagramMuxer struct {
  22. session quic.Connection
  23. logger *zerolog.Logger
  24. demuxChan chan<- *packet.Session
  25. }
  26. func NewDatagramMuxer(quicSession quic.Connection, log *zerolog.Logger, demuxChan chan<- *packet.Session) *DatagramMuxer {
  27. logger := log.With().Uint8("datagramVersion", 1).Logger()
  28. return &DatagramMuxer{
  29. session: quicSession,
  30. logger: &logger,
  31. demuxChan: demuxChan,
  32. }
  33. }
  34. // Maximum application payload to send to / receive from QUIC datagram frame
  35. func (dm *DatagramMuxer) mtu() int {
  36. return maxDatagramPayloadSize
  37. }
  38. func (dm *DatagramMuxer) SendToSession(session *packet.Session) error {
  39. if len(session.Payload) > dm.mtu() {
  40. packetTooBigDropped.Inc()
  41. return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(session.Payload), dm.mtu())
  42. }
  43. payloadWithMetadata, err := SuffixSessionID(session.ID, session.Payload)
  44. if err != nil {
  45. return errors.Wrap(err, "Failed to suffix session ID to datagram, it will be dropped")
  46. }
  47. if err := dm.session.SendDatagram(payloadWithMetadata); err != nil {
  48. return errors.Wrap(err, "Failed to send datagram back to edge")
  49. }
  50. return nil
  51. }
  52. func (dm *DatagramMuxer) ServeReceive(ctx context.Context) error {
  53. for {
  54. // Extracts datagram session ID, then sends the session ID and payload to receiver
  55. // which determines how to proxy to the origin. It assumes the datagram session has already been
  56. // registered with receiver through other side channel
  57. msg, err := dm.session.ReceiveDatagram(ctx)
  58. if err != nil {
  59. return err
  60. }
  61. if err := dm.demux(ctx, msg); err != nil {
  62. dm.logger.Error().Err(err).Msg("Failed to demux datagram")
  63. if err == context.Canceled {
  64. return err
  65. }
  66. }
  67. }
  68. }
  69. func (dm *DatagramMuxer) demux(ctx context.Context, msg []byte) error {
  70. sessionID, payload, err := extractSessionID(msg)
  71. if err != nil {
  72. return err
  73. }
  74. sessionDatagram := packet.Session{
  75. ID: sessionID,
  76. Payload: payload,
  77. }
  78. select {
  79. case dm.demuxChan <- &sessionDatagram:
  80. return nil
  81. case <-ctx.Done():
  82. return ctx.Err()
  83. }
  84. }
  85. // Each QUIC datagram should be suffixed with session ID.
  86. // extractSessionID extracts the session ID and a slice with only the payload
  87. func extractSessionID(b []byte) (uuid.UUID, []byte, error) {
  88. msgLen := len(b)
  89. if msgLen < sessionIDLen {
  90. return uuid.Nil, nil, fmt.Errorf("session ID has %d bytes, but data only has %d", sessionIDLen, len(b))
  91. }
  92. // Parse last 16 bytess as UUID and remove it from slice
  93. sessionID, err := uuid.FromBytes(b[len(b)-sessionIDLen:])
  94. if err != nil {
  95. return uuid.Nil, nil, err
  96. }
  97. b = b[:len(b)-sessionIDLen]
  98. return sessionID, b, nil
  99. }
  100. // SuffixSessionID appends the session ID at the end of the payload. Suffix is more performant than prefix because
  101. // the payload slice might already have enough capacity to append the session ID at the end
  102. func SuffixSessionID(sessionID uuid.UUID, b []byte) ([]byte, error) {
  103. return suffixMetadata(b, sessionID[:])
  104. }
  105. func suffixMetadata(payload, metadata []byte) ([]byte, error) {
  106. if len(payload)+len(metadata) > MaxDatagramFrameSize {
  107. return nil, fmt.Errorf("datagram size exceed %d", MaxDatagramFrameSize)
  108. }
  109. return append(payload, metadata...), nil
  110. }