icmp_darwin.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. //go:build darwin
  2. package ingress
  3. // This file implements ICMPProxy for Darwin. It uses a non-privileged ICMP socket to send echo requests and listen for
  4. // echo replies. The source IP of the requests are rewritten to the bind IP of the socket and the socket reads all
  5. // messages, so we use echo ID to distinguish the replies. Each (source IP, destination IP, echo ID) is assigned a
  6. // unique echo ID.
  7. import (
  8. "context"
  9. "fmt"
  10. "math"
  11. "net/netip"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "github.com/rs/zerolog"
  16. "go.opentelemetry.io/otel/attribute"
  17. "golang.org/x/net/icmp"
  18. "github.com/cloudflare/cloudflared/packet"
  19. "github.com/cloudflare/cloudflared/tracing"
  20. )
  21. type icmpProxy struct {
  22. srcFunnelTracker *packet.FunnelTracker
  23. echoIDTracker *echoIDTracker
  24. conn *icmp.PacketConn
  25. logger *zerolog.Logger
  26. idleTimeout time.Duration
  27. }
  28. // echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
  29. // then from the beginning to lastAssignment.
  30. // ICMP echo are short lived. By the time an ID is revisited, it should have been released.
  31. type echoIDTracker struct {
  32. lock sync.Mutex
  33. // maps the source IP, destination IP and original echo ID to a unique echo ID obtained from assignment
  34. mapping map[flow3Tuple]uint16
  35. // assignment tracks if an ID is assigned using index as the ID
  36. // The size of the array is math.MaxUint16 because echo ID is 2 bytes
  37. assignment [math.MaxUint16]bool
  38. // nextAssignment is the next number to check for assigment
  39. nextAssignment uint16
  40. }
  41. func newEchoIDTracker() *echoIDTracker {
  42. return &echoIDTracker{
  43. mapping: make(map[flow3Tuple]uint16),
  44. }
  45. }
  46. // Get assignment or assign a new ID.
  47. func (eit *echoIDTracker) getOrAssign(key flow3Tuple) (id uint16, success bool) {
  48. eit.lock.Lock()
  49. defer eit.lock.Unlock()
  50. id, exists := eit.mapping[key]
  51. if exists {
  52. return id, true
  53. }
  54. if eit.nextAssignment == math.MaxUint16 {
  55. eit.nextAssignment = 0
  56. }
  57. for i, assigned := range eit.assignment[eit.nextAssignment:] {
  58. if !assigned {
  59. echoID := uint16(i) + eit.nextAssignment
  60. eit.set(key, echoID)
  61. return echoID, true
  62. }
  63. }
  64. for i, assigned := range eit.assignment[0:eit.nextAssignment] {
  65. if !assigned {
  66. echoID := uint16(i)
  67. eit.set(key, echoID)
  68. return echoID, true
  69. }
  70. }
  71. return 0, false
  72. }
  73. // Caller should hold the lock
  74. func (eit *echoIDTracker) set(key flow3Tuple, assignedEchoID uint16) {
  75. eit.assignment[assignedEchoID] = true
  76. eit.mapping[key] = assignedEchoID
  77. eit.nextAssignment = assignedEchoID + 1
  78. }
  79. func (eit *echoIDTracker) release(key flow3Tuple, assigned uint16) bool {
  80. eit.lock.Lock()
  81. defer eit.lock.Unlock()
  82. currentEchoID, exists := eit.mapping[key]
  83. if exists && assigned == currentEchoID {
  84. delete(eit.mapping, key)
  85. eit.assignment[assigned] = false
  86. return true
  87. }
  88. return false
  89. }
  90. type echoFunnelID uint16
  91. func (snf echoFunnelID) Type() string {
  92. return "echoID"
  93. }
  94. func (snf echoFunnelID) String() string {
  95. return strconv.FormatUint(uint64(snf), 10)
  96. }
  97. func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
  98. conn, err := newICMPConn(listenIP)
  99. if err != nil {
  100. return nil, err
  101. }
  102. logger.Info().Msgf("Created ICMP proxy listening on %s", conn.LocalAddr())
  103. return &icmpProxy{
  104. srcFunnelTracker: packet.NewFunnelTracker(),
  105. echoIDTracker: newEchoIDTracker(),
  106. conn: conn,
  107. logger: logger,
  108. idleTimeout: idleTimeout,
  109. }, nil
  110. }
  111. func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
  112. _, span := responder.RequestSpan(ctx, pk)
  113. defer responder.ExportSpan()
  114. originalEcho, err := getICMPEcho(pk.Message)
  115. if err != nil {
  116. tracing.EndWithErrorStatus(span, err)
  117. return err
  118. }
  119. observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)
  120. echoIDTrackerKey := flow3Tuple{
  121. srcIP: pk.Src,
  122. dstIP: pk.Dst,
  123. originalEchoID: originalEcho.ID,
  124. }
  125. assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
  126. if !success {
  127. err := fmt.Errorf("failed to assign unique echo ID")
  128. tracing.EndWithErrorStatus(span, err)
  129. return err
  130. }
  131. span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
  132. shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID)
  133. newFunnelFunc := func() (packet.Funnel, error) {
  134. originalEcho, err := getICMPEcho(pk.Message)
  135. if err != nil {
  136. return nil, err
  137. }
  138. closeCallback := func() error {
  139. ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
  140. return nil
  141. }
  142. icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID)
  143. return icmpFlow, nil
  144. }
  145. funnelID := echoFunnelID(assignedEchoID)
  146. funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, shouldReplaceFunnelFunc, newFunnelFunc)
  147. if err != nil {
  148. tracing.EndWithErrorStatus(span, err)
  149. return err
  150. }
  151. if isNew {
  152. span.SetAttributes(attribute.Bool("newFlow", true))
  153. ip.logger.Debug().
  154. Str("src", pk.Src.String()).
  155. Str("dst", pk.Dst.String()).
  156. Int("originalEchoID", originalEcho.ID).
  157. Int("assignedEchoID", int(assignedEchoID)).
  158. Msg("New flow")
  159. }
  160. icmpFlow, err := toICMPEchoFlow(funnel)
  161. if err != nil {
  162. tracing.EndWithErrorStatus(span, err)
  163. return err
  164. }
  165. err = icmpFlow.sendToDst(pk.Dst, pk.Message)
  166. if err != nil {
  167. tracing.EndWithErrorStatus(span, err)
  168. return err
  169. }
  170. tracing.End(span)
  171. return nil
  172. }
  173. // Serve listens for responses to the requests until context is done
  174. func (ip *icmpProxy) Serve(ctx context.Context) error {
  175. go func() {
  176. <-ctx.Done()
  177. ip.conn.Close()
  178. }()
  179. go func() {
  180. ip.srcFunnelTracker.ScheduleCleanup(ctx, ip.idleTimeout)
  181. }()
  182. buf := make([]byte, mtu)
  183. icmpDecoder := packet.NewICMPDecoder()
  184. for {
  185. n, from, err := ip.conn.ReadFrom(buf)
  186. if err != nil {
  187. return err
  188. }
  189. reply, err := parseReply(from, buf[:n])
  190. if err != nil {
  191. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
  192. // In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
  193. // the second reply
  194. if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil {
  195. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet")
  196. }
  197. continue
  198. }
  199. if !isEchoReply(reply.msg) {
  200. ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
  201. continue
  202. }
  203. if err := ip.sendReply(ctx, reply); err != nil {
  204. ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
  205. continue
  206. }
  207. }
  208. }
  209. func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
  210. icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
  211. if err != nil {
  212. return err
  213. }
  214. echo, err := getICMPEcho(icmpPacket.Message)
  215. if err != nil {
  216. return err
  217. }
  218. reply := echoReply{
  219. from: icmpPacket.Src,
  220. msg: icmpPacket.Message,
  221. echo: echo,
  222. }
  223. if ip.sendReply(ctx, &reply); err != nil {
  224. return err
  225. }
  226. return nil
  227. }
  228. func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
  229. funnelID := echoFunnelID(reply.echo.ID)
  230. funnel, ok := ip.srcFunnelTracker.Get(funnelID)
  231. if !ok {
  232. return packet.ErrFunnelNotFound
  233. }
  234. icmpFlow, err := toICMPEchoFlow(funnel)
  235. if err != nil {
  236. return err
  237. }
  238. _, span := icmpFlow.responder.ReplySpan(ctx, ip.logger)
  239. defer icmpFlow.responder.ExportSpan()
  240. if err := icmpFlow.returnToSrc(reply); err != nil {
  241. tracing.EndWithErrorStatus(span, err)
  242. return err
  243. }
  244. observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
  245. span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
  246. tracing.End(span)
  247. return nil
  248. }