datagramv2.go 6.8 KB


  1. package quic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/pkg/errors"
  6. "github.com/quic-go/quic-go"
  7. "github.com/rs/zerolog"
  8. "github.com/cloudflare/cloudflared/packet"
  9. "github.com/cloudflare/cloudflared/tracing"
  10. )
  11. type DatagramV2Type byte
  12. const (
  13. // UDP payload
  14. DatagramTypeUDP DatagramV2Type = iota
  15. // Full IP packet
  16. DatagramTypeIP
  17. // DatagramTypeIP + tracing ID
  18. DatagramTypeIPWithTrace
  19. // Tracing spans in protobuf format
  20. DatagramTypeTracingSpan
  21. )
  22. type Packet interface {
  23. Type() DatagramV2Type
  24. Payload() []byte
  25. Metadata() []byte
  26. }
  27. const (
  28. typeIDLen = 1
  29. // Same as sessionDemuxChan capacity
  30. packetChanCapacity = 128
  31. )
  32. func SuffixType(b []byte, datagramType DatagramV2Type) ([]byte, error) {
  33. if len(b)+typeIDLen > MaxDatagramFrameSize {
  34. return nil, fmt.Errorf("datagram size %d exceeds max frame size %d", len(b), MaxDatagramFrameSize)
  35. }
  36. b = append(b, byte(datagramType))
  37. return b, nil
  38. }
  39. // Maximum application payload to send to / receive from QUIC datagram frame
  40. func (dm *DatagramMuxerV2) mtu() int {
  41. return maxDatagramPayloadSize
  42. }
  43. type DatagramMuxerV2 struct {
  44. session quic.Connection
  45. logger *zerolog.Logger
  46. sessionDemuxChan chan<- *packet.Session
  47. packetDemuxChan chan Packet
  48. }
  49. func NewDatagramMuxerV2(
  50. quicSession quic.Connection,
  51. log *zerolog.Logger,
  52. sessionDemuxChan chan<- *packet.Session,
  53. ) *DatagramMuxerV2 {
  54. logger := log.With().Uint8("datagramVersion", 2).Logger()
  55. return &DatagramMuxerV2{
  56. session: quicSession,
  57. logger: &logger,
  58. sessionDemuxChan: sessionDemuxChan,
  59. packetDemuxChan: make(chan Packet, packetChanCapacity),
  60. }
  61. }
  62. // SendToSession suffix the session ID and datagram version to the payload so the other end of the QUIC connection can
  63. // demultiplex the payload from multiple datagram sessions
  64. func (dm *DatagramMuxerV2) SendToSession(session *packet.Session) error {
  65. if len(session.Payload) > dm.mtu() {
  66. packetTooBigDropped.Inc()
  67. return fmt.Errorf("origin UDP payload has %d bytes, which exceeds transport MTU %d", len(session.Payload), dm.mtu())
  68. }
  69. msgWithID, err := SuffixSessionID(session.ID, session.Payload)
  70. if err != nil {
  71. return errors.Wrap(err, "Failed to suffix session ID to datagram, it will be dropped")
  72. }
  73. msgWithIDAndType, err := SuffixType(msgWithID, DatagramTypeUDP)
  74. if err != nil {
  75. return errors.Wrap(err, "Failed to suffix datagram type, it will be dropped")
  76. }
  77. if err := dm.session.SendDatagram(msgWithIDAndType); err != nil {
  78. return errors.Wrap(err, "Failed to send datagram back to edge")
  79. }
  80. return nil
  81. }
  82. // SendPacket sends a packet with datagram version in the suffix. If ctx is a TracedContext, it adds the tracing
  83. // context between payload and datagram version.
  84. // The other end of the QUIC connection can demultiplex by parsing the payload as IP and look at the source and destination.
  85. func (dm *DatagramMuxerV2) SendPacket(pk Packet) error {
  86. payloadWithMetadata, err := suffixMetadata(pk.Payload(), pk.Metadata())
  87. if err != nil {
  88. return err
  89. }
  90. payloadWithMetadataAndType, err := SuffixType(payloadWithMetadata, pk.Type())
  91. if err != nil {
  92. return errors.Wrap(err, "Failed to suffix datagram type, it will be dropped")
  93. }
  94. if err := dm.session.SendDatagram(payloadWithMetadataAndType); err != nil {
  95. return errors.Wrap(err, "Failed to send datagram back to edge")
  96. }
  97. return nil
  98. }
  99. // Demux reads datagrams from the QUIC connection and demuxes depending on whether it's a session or packet
  100. func (dm *DatagramMuxerV2) ServeReceive(ctx context.Context) error {
  101. for {
  102. msg, err := dm.session.ReceiveDatagram(ctx)
  103. if err != nil {
  104. return err
  105. }
  106. if err := dm.demux(ctx, msg); err != nil {
  107. dm.logger.Error().Err(err).Msg("Failed to demux datagram")
  108. if err == context.Canceled {
  109. return err
  110. }
  111. }
  112. }
  113. }
  114. func (dm *DatagramMuxerV2) ReceivePacket(ctx context.Context) (pk Packet, err error) {
  115. select {
  116. case <-ctx.Done():
  117. return nil, ctx.Err()
  118. case pk := <-dm.packetDemuxChan:
  119. return pk, nil
  120. }
  121. }
  122. func (dm *DatagramMuxerV2) demux(ctx context.Context, msgWithType []byte) error {
  123. if len(msgWithType) < typeIDLen {
  124. return fmt.Errorf("QUIC datagram should have at least %d byte", typeIDLen)
  125. }
  126. msgType := DatagramV2Type(msgWithType[len(msgWithType)-typeIDLen])
  127. msg := msgWithType[0 : len(msgWithType)-typeIDLen]
  128. switch msgType {
  129. case DatagramTypeUDP:
  130. return dm.handleSession(ctx, msg)
  131. default:
  132. return dm.handlePacket(ctx, msg, msgType)
  133. }
  134. }
  135. func (dm *DatagramMuxerV2) handleSession(ctx context.Context, session []byte) error {
  136. sessionID, payload, err := extractSessionID(session)
  137. if err != nil {
  138. return err
  139. }
  140. sessionDatagram := packet.Session{
  141. ID: sessionID,
  142. Payload: payload,
  143. }
  144. select {
  145. case dm.sessionDemuxChan <- &sessionDatagram:
  146. return nil
  147. case <-ctx.Done():
  148. return ctx.Err()
  149. }
  150. }
  151. func (dm *DatagramMuxerV2) handlePacket(ctx context.Context, pk []byte, msgType DatagramV2Type) error {
  152. var demuxedPacket Packet
  153. switch msgType {
  154. case DatagramTypeIP:
  155. demuxedPacket = RawPacket(packet.RawPacket{Data: pk})
  156. case DatagramTypeIPWithTrace:
  157. tracingIdentity, payload, err := extractTracingIdentity(pk)
  158. if err != nil {
  159. return err
  160. }
  161. demuxedPacket = &TracedPacket{
  162. Packet: packet.RawPacket{Data: payload},
  163. TracingIdentity: tracingIdentity,
  164. }
  165. case DatagramTypeTracingSpan:
  166. tracingIdentity, spans, err := extractTracingIdentity(pk)
  167. if err != nil {
  168. return err
  169. }
  170. demuxedPacket = &TracingSpanPacket{
  171. Spans: spans,
  172. TracingIdentity: tracingIdentity,
  173. }
  174. default:
  175. return fmt.Errorf("Unexpected datagram type %d", msgType)
  176. }
  177. select {
  178. case <-ctx.Done():
  179. return ctx.Err()
  180. case dm.packetDemuxChan <- demuxedPacket:
  181. return nil
  182. }
  183. }
  184. func extractTracingIdentity(pk []byte) (tracingIdentity []byte, payload []byte, err error) {
  185. if len(pk) < tracing.IdentityLength {
  186. return nil, nil, fmt.Errorf("packet with tracing context should have at least %d bytes, got %v", tracing.IdentityLength, pk)
  187. }
  188. tracingIdentity = pk[len(pk)-tracing.IdentityLength:]
  189. payload = pk[:len(pk)-tracing.IdentityLength]
  190. return tracingIdentity, payload, nil
  191. }
  192. type RawPacket packet.RawPacket
  193. func (rw RawPacket) Type() DatagramV2Type {
  194. return DatagramTypeIP
  195. }
  196. func (rw RawPacket) Payload() []byte {
  197. return rw.Data
  198. }
  199. func (rw RawPacket) Metadata() []byte {
  200. return []byte{}
  201. }
  202. type TracedPacket struct {
  203. Packet packet.RawPacket
  204. TracingIdentity []byte
  205. }
  206. func (tp *TracedPacket) Type() DatagramV2Type {
  207. return DatagramTypeIPWithTrace
  208. }
  209. func (tp *TracedPacket) Payload() []byte {
  210. return tp.Packet.Data
  211. }
  212. func (tp *TracedPacket) Metadata() []byte {
  213. return tp.TracingIdentity
  214. }
  215. type TracingSpanPacket struct {
  216. Spans []byte
  217. TracingIdentity []byte
  218. }
  219. func (tsp *TracingSpanPacket) Type() DatagramV2Type {
  220. return DatagramTypeTracingSpan
  221. }
  222. func (tsp *TracingSpanPacket) Payload() []byte {
  223. return tsp.Spans
  224. }
  225. func (tsp *TracingSpanPacket) Metadata() []byte {
  226. return tsp.TracingIdentity
  227. }