icmp_darwin.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. //go:build darwin
  2. package ingress
  3. // This file implements ICMPProxy for Darwin. It uses a non-privileged ICMP socket to send echo requests and listen for
  4. // echo replies. The source IP of the requests are rewritten to the bind IP of the socket and the socket reads all
  5. // messages, so we use echo ID to distinguish the replies. Each (source IP, destination IP, echo ID) is assigned a
  6. // unique echo ID.
  7. import (
  8. "context"
  9. "fmt"
  10. "math"
  11. "net/netip"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "github.com/rs/zerolog"
  16. "go.opentelemetry.io/otel/attribute"
  17. "golang.org/x/net/icmp"
  18. "github.com/cloudflare/cloudflared/packet"
  19. "github.com/cloudflare/cloudflared/tracing"
  20. )
  21. type icmpProxy struct {
  22. srcFunnelTracker *packet.FunnelTracker
  23. echoIDTracker *echoIDTracker
  24. conn *icmp.PacketConn
  25. // Response is handled in one-by-one, so encoder can be shared between funnels
  26. encoder *packet.Encoder
  27. logger *zerolog.Logger
  28. idleTimeout time.Duration
  29. }
  30. // echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
  31. // then from the beginning to lastAssignment.
  32. // ICMP echo are short lived. By the time an ID is revisited, it should have been released.
  33. type echoIDTracker struct {
  34. lock sync.Mutex
  35. // maps the source IP, destination IP and original echo ID to a unique echo ID obtained from assignment
  36. mapping map[flow3Tuple]uint16
  37. // assignment tracks if an ID is assigned using index as the ID
  38. // The size of the array is math.MaxUint16 because echo ID is 2 bytes
  39. assignment [math.MaxUint16]bool
  40. // nextAssignment is the next number to check for assigment
  41. nextAssignment uint16
  42. }
  43. func newEchoIDTracker() *echoIDTracker {
  44. return &echoIDTracker{
  45. mapping: make(map[flow3Tuple]uint16),
  46. }
  47. }
  48. // Get assignment or assign a new ID.
  49. func (eit *echoIDTracker) getOrAssign(key flow3Tuple) (id uint16, success bool) {
  50. eit.lock.Lock()
  51. defer eit.lock.Unlock()
  52. id, exists := eit.mapping[key]
  53. if exists {
  54. return id, true
  55. }
  56. if eit.nextAssignment == math.MaxUint16 {
  57. eit.nextAssignment = 0
  58. }
  59. for i, assigned := range eit.assignment[eit.nextAssignment:] {
  60. if !assigned {
  61. echoID := uint16(i) + eit.nextAssignment
  62. eit.set(key, echoID)
  63. return echoID, true
  64. }
  65. }
  66. for i, assigned := range eit.assignment[0:eit.nextAssignment] {
  67. if !assigned {
  68. echoID := uint16(i)
  69. eit.set(key, echoID)
  70. return echoID, true
  71. }
  72. }
  73. return 0, false
  74. }
  75. // Caller should hold the lock
  76. func (eit *echoIDTracker) set(key flow3Tuple, assignedEchoID uint16) {
  77. eit.assignment[assignedEchoID] = true
  78. eit.mapping[key] = assignedEchoID
  79. eit.nextAssignment = assignedEchoID + 1
  80. }
  81. func (eit *echoIDTracker) release(key flow3Tuple, assigned uint16) bool {
  82. eit.lock.Lock()
  83. defer eit.lock.Unlock()
  84. currentEchoID, exists := eit.mapping[key]
  85. if exists && assigned == currentEchoID {
  86. delete(eit.mapping, key)
  87. eit.assignment[assigned] = false
  88. return true
  89. }
  90. return false
  91. }
  92. type echoFunnelID uint16
  93. func (snf echoFunnelID) Type() string {
  94. return "echoID"
  95. }
  96. func (snf echoFunnelID) String() string {
  97. return strconv.FormatUint(uint64(snf), 10)
  98. }
  99. func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
  100. conn, err := newICMPConn(listenIP)
  101. if err != nil {
  102. return nil, err
  103. }
  104. logger.Info().Msgf("Created ICMP proxy listening on %s", conn.LocalAddr())
  105. return &icmpProxy{
  106. srcFunnelTracker: packet.NewFunnelTracker(),
  107. echoIDTracker: newEchoIDTracker(),
  108. encoder: packet.NewEncoder(),
  109. conn: conn,
  110. logger: logger,
  111. idleTimeout: idleTimeout,
  112. }, nil
  113. }
  114. func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
  115. _, span := responder.RequestSpan(ctx, pk)
  116. defer responder.ExportSpan()
  117. originalEcho, err := getICMPEcho(pk.Message)
  118. if err != nil {
  119. tracing.EndWithErrorStatus(span, err)
  120. return err
  121. }
  122. observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)
  123. echoIDTrackerKey := flow3Tuple{
  124. srcIP: pk.Src,
  125. dstIP: pk.Dst,
  126. originalEchoID: originalEcho.ID,
  127. }
  128. assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
  129. if !success {
  130. err := fmt.Errorf("failed to assign unique echo ID")
  131. tracing.EndWithErrorStatus(span, err)
  132. return err
  133. }
  134. span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
  135. shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID)
  136. newFunnelFunc := func() (packet.Funnel, error) {
  137. originalEcho, err := getICMPEcho(pk.Message)
  138. if err != nil {
  139. return nil, err
  140. }
  141. closeCallback := func() error {
  142. ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
  143. return nil
  144. }
  145. icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
  146. return icmpFlow, nil
  147. }
  148. funnelID := echoFunnelID(assignedEchoID)
  149. funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, shouldReplaceFunnelFunc, newFunnelFunc)
  150. if err != nil {
  151. tracing.EndWithErrorStatus(span, err)
  152. return err
  153. }
  154. if isNew {
  155. span.SetAttributes(attribute.Bool("newFlow", true))
  156. ip.logger.Debug().
  157. Str("src", pk.Src.String()).
  158. Str("dst", pk.Dst.String()).
  159. Int("originalEchoID", originalEcho.ID).
  160. Int("assignedEchoID", int(assignedEchoID)).
  161. Msg("New flow")
  162. }
  163. icmpFlow, err := toICMPEchoFlow(funnel)
  164. if err != nil {
  165. tracing.EndWithErrorStatus(span, err)
  166. return err
  167. }
  168. err = icmpFlow.sendToDst(pk.Dst, pk.Message)
  169. if err != nil {
  170. tracing.EndWithErrorStatus(span, err)
  171. return err
  172. }
  173. tracing.End(span)
  174. return nil
  175. }
  176. // Serve listens for responses to the requests until context is done
  177. func (ip *icmpProxy) Serve(ctx context.Context) error {
  178. go func() {
  179. <-ctx.Done()
  180. ip.conn.Close()
  181. }()
  182. go func() {
  183. ip.srcFunnelTracker.ScheduleCleanup(ctx, ip.idleTimeout)
  184. }()
  185. buf := make([]byte, mtu)
  186. icmpDecoder := packet.NewICMPDecoder()
  187. for {
  188. n, from, err := ip.conn.ReadFrom(buf)
  189. if err != nil {
  190. return err
  191. }
  192. reply, err := parseReply(from, buf[:n])
  193. if err != nil {
  194. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
  195. // In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
  196. // the second reply
  197. if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil {
  198. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet")
  199. }
  200. continue
  201. }
  202. if !isEchoReply(reply.msg) {
  203. ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
  204. continue
  205. }
  206. if err := ip.sendReply(ctx, reply); err != nil {
  207. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
  208. continue
  209. }
  210. }
  211. }
  212. func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
  213. icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
  214. if err != nil {
  215. return err
  216. }
  217. echo, err := getICMPEcho(icmpPacket.Message)
  218. if err != nil {
  219. return err
  220. }
  221. reply := echoReply{
  222. from: icmpPacket.Src,
  223. msg: icmpPacket.Message,
  224. echo: echo,
  225. }
  226. if ip.sendReply(ctx, &reply); err != nil {
  227. return err
  228. }
  229. return nil
  230. }
  231. func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
  232. funnelID := echoFunnelID(reply.echo.ID)
  233. funnel, ok := ip.srcFunnelTracker.Get(funnelID)
  234. if !ok {
  235. return packet.ErrFunnelNotFound
  236. }
  237. icmpFlow, err := toICMPEchoFlow(funnel)
  238. if err != nil {
  239. return err
  240. }
  241. _, span := icmpFlow.responder.ReplySpan(ctx, ip.logger)
  242. defer icmpFlow.responder.ExportSpan()
  243. if err := icmpFlow.returnToSrc(reply); err != nil {
  244. tracing.EndWithErrorStatus(span, err)
  245. return err
  246. }
  247. observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
  248. span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
  249. tracing.End(span)
  250. return nil
  251. }