packet_router.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package ingress
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/rs/zerolog"
  6. "go.opentelemetry.io/otel/attribute"
  7. "go.opentelemetry.io/otel/trace"
  8. "github.com/cloudflare/cloudflared/packet"
  9. quicpogs "github.com/cloudflare/cloudflared/quic"
  10. "github.com/cloudflare/cloudflared/tracing"
  11. )
  12. // Upstream of raw packets
  13. type muxer interface {
  14. SendPacket(pk quicpogs.Packet) error
  15. // ReceivePacket waits for the next raw packet from upstream
  16. ReceivePacket(ctx context.Context) (quicpogs.Packet, error)
  17. }
  18. // PacketRouter routes packets between Upstream and ICMPRouter. Currently it rejects all other type of ICMP packets
  19. type PacketRouter struct {
  20. icmpRouter ICMPRouter
  21. muxer muxer
  22. connID uint8
  23. logger *zerolog.Logger
  24. encoder *packet.Encoder
  25. decoder *packet.ICMPDecoder
  26. }
  27. // NewPacketRouter creates a PacketRouter that handles ICMP packets. Packets are read from muxer but dropped if globalConfig is nil.
  28. func NewPacketRouter(icmpRouter ICMPRouter, muxer muxer, connIndex uint8, logger *zerolog.Logger) *PacketRouter {
  29. return &PacketRouter{
  30. icmpRouter: icmpRouter,
  31. muxer: muxer,
  32. connID: connIndex,
  33. logger: logger,
  34. encoder: packet.NewEncoder(),
  35. decoder: packet.NewICMPDecoder(),
  36. }
  37. }
  38. func (r *PacketRouter) Serve(ctx context.Context) error {
  39. for {
  40. rawPacket, responder, err := r.nextPacket(ctx)
  41. if err != nil {
  42. return err
  43. }
  44. r.handlePacket(ctx, rawPacket, responder)
  45. }
  46. }
  47. func (r *PacketRouter) nextPacket(ctx context.Context) (packet.RawPacket, ICMPResponder, error) {
  48. pk, err := r.muxer.ReceivePacket(ctx)
  49. if err != nil {
  50. return packet.RawPacket{}, nil, err
  51. }
  52. responder := newPacketResponder(r.muxer, r.connID, packet.NewEncoder())
  53. switch pk.Type() {
  54. case quicpogs.DatagramTypeIP:
  55. return packet.RawPacket{Data: pk.Payload()}, responder, nil
  56. case quicpogs.DatagramTypeIPWithTrace:
  57. var identity tracing.Identity
  58. if err := identity.UnmarshalBinary(pk.Metadata()); err != nil {
  59. r.logger.Err(err).Bytes("tracingIdentity", pk.Metadata()).Msg("Failed to unmarshal tracing identity")
  60. } else {
  61. tracedCtx := tracing.NewTracedContext(ctx, identity.String(), r.logger)
  62. responder.AddTraceContext(tracedCtx, pk.Metadata())
  63. }
  64. return packet.RawPacket{Data: pk.Payload()}, responder, nil
  65. default:
  66. return packet.RawPacket{}, nil, fmt.Errorf("unexpected datagram type %d", pk.Type())
  67. }
  68. }
  69. func (r *PacketRouter) handlePacket(ctx context.Context, rawPacket packet.RawPacket, responder ICMPResponder) {
  70. // ICMP Proxy feature is disabled, drop packets
  71. if r.icmpRouter == nil {
  72. return
  73. }
  74. icmpPacket, err := r.decoder.Decode(rawPacket)
  75. if err != nil {
  76. r.logger.Err(err).Msg("Failed to decode ICMP packet from quic datagram")
  77. return
  78. }
  79. if icmpPacket.TTL <= 1 {
  80. if err := r.sendTTLExceedMsg(icmpPacket, rawPacket); err != nil {
  81. r.logger.Err(err).Msg("Failed to return ICMP TTL exceed error")
  82. }
  83. return
  84. }
  85. icmpPacket.TTL--
  86. if err := r.icmpRouter.Request(ctx, icmpPacket, responder); err != nil {
  87. r.logger.Err(err).
  88. Str("src", icmpPacket.Src.String()).
  89. Str("dst", icmpPacket.Dst.String()).
  90. Interface("type", icmpPacket.Type).
  91. Msg("Failed to send ICMP packet")
  92. }
  93. }
  94. func (r *PacketRouter) sendTTLExceedMsg(pk *packet.ICMP, rawPacket packet.RawPacket) error {
  95. icmpTTLPacket := r.icmpRouter.ConvertToTTLExceeded(pk, rawPacket)
  96. encodedTTLExceed, err := r.encoder.Encode(icmpTTLPacket)
  97. if err != nil {
  98. return err
  99. }
  100. return r.muxer.SendPacket(quicpogs.RawPacket(encodedTTLExceed))
  101. }
  102. // packetResponder should not be used concurrently. This assumption is upheld because reply packets are ready one-by-one
  103. type packetResponder struct {
  104. datagramMuxer muxer
  105. connID uint8
  106. encoder *packet.Encoder
  107. tracedCtx *tracing.TracedContext
  108. serializedIdentity []byte
  109. // hadReply tracks if there has been any reply for this flow
  110. hadReply bool
  111. }
  112. func newPacketResponder(datagramMuxer muxer, connID uint8, encoder *packet.Encoder) ICMPResponder {
  113. return &packetResponder{
  114. datagramMuxer: datagramMuxer,
  115. connID: connID,
  116. encoder: encoder,
  117. }
  118. }
  119. func (pr *packetResponder) tracingEnabled() bool {
  120. return pr.tracedCtx != nil
  121. }
  122. func (pr *packetResponder) ConnectionID() uint8 {
  123. return pr.connID
  124. }
  125. func (pr *packetResponder) ReturnPacket(pk *packet.ICMP) error {
  126. rawPacket, err := pr.encoder.Encode(pk)
  127. if err != nil {
  128. return err
  129. }
  130. pr.hadReply = true
  131. return pr.datagramMuxer.SendPacket(quicpogs.RawPacket(rawPacket))
  132. }
  133. func (pr *packetResponder) AddTraceContext(tracedCtx *tracing.TracedContext, serializedIdentity []byte) {
  134. pr.tracedCtx = tracedCtx
  135. pr.serializedIdentity = serializedIdentity
  136. }
  137. func (pr *packetResponder) RequestSpan(ctx context.Context, pk *packet.ICMP) (context.Context, trace.Span) {
  138. if !pr.tracingEnabled() {
  139. return ctx, tracing.NewNoopSpan()
  140. }
  141. return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-request", trace.WithAttributes(
  142. attribute.String("src", pk.Src.String()),
  143. attribute.String("dst", pk.Dst.String()),
  144. ))
  145. }
  146. func (pr *packetResponder) ReplySpan(ctx context.Context, logger *zerolog.Logger) (context.Context, trace.Span) {
  147. if !pr.tracingEnabled() || pr.hadReply {
  148. return ctx, tracing.NewNoopSpan()
  149. }
  150. return pr.tracedCtx.Tracer().Start(pr.tracedCtx, "icmp-echo-reply")
  151. }
  152. func (pr *packetResponder) ExportSpan() {
  153. if !pr.tracingEnabled() {
  154. return
  155. }
  156. spans := pr.tracedCtx.GetProtoSpans()
  157. if len(spans) > 0 {
  158. pr.datagramMuxer.SendPacket(&quicpogs.TracingSpanPacket{
  159. Spans: spans,
  160. TracingIdentity: pr.serializedIdentity,
  161. })
  162. }
  163. }