123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- //go:build darwin
- package ingress
- // This file implements ICMPProxy for Darwin. It uses a non-privileged ICMP socket to send echo requests and listen for
- // echo replies. The source IP of the requests are rewritten to the bind IP of the socket and the socket reads all
- // messages, so we use echo ID to distinguish the replies. Each (source IP, destination IP, echo ID) is assigned a
- // unique echo ID.
- import (
- "context"
- "fmt"
- "math"
- "net/netip"
- "strconv"
- "sync"
- "time"
- "github.com/rs/zerolog"
- "go.opentelemetry.io/otel/attribute"
- "golang.org/x/net/icmp"
- "github.com/cloudflare/cloudflared/packet"
- "github.com/cloudflare/cloudflared/tracing"
- )
- type icmpProxy struct {
- srcFunnelTracker *packet.FunnelTracker
- echoIDTracker *echoIDTracker
- conn *icmp.PacketConn
- // Response is handled in one-by-one, so encoder can be shared between funnels
- encoder *packet.Encoder
- logger *zerolog.Logger
- idleTimeout time.Duration
- }
- // echoIDTracker tracks which ID has been assigned. It first loops through assignment from lastAssignment to then end,
- // then from the beginning to lastAssignment.
- // ICMP echo are short lived. By the time an ID is revisited, it should have been released.
- type echoIDTracker struct {
- lock sync.Mutex
- // maps the source IP, destination IP and original echo ID to a unique echo ID obtained from assignment
- mapping map[flow3Tuple]uint16
- // assignment tracks if an ID is assigned using index as the ID
- // The size of the array is math.MaxUint16 because echo ID is 2 bytes
- assignment [math.MaxUint16]bool
- // nextAssignment is the next number to check for assigment
- nextAssignment uint16
- }
- func newEchoIDTracker() *echoIDTracker {
- return &echoIDTracker{
- mapping: make(map[flow3Tuple]uint16),
- }
- }
- // Get assignment or assign a new ID.
- func (eit *echoIDTracker) getOrAssign(key flow3Tuple) (id uint16, success bool) {
- eit.lock.Lock()
- defer eit.lock.Unlock()
- id, exists := eit.mapping[key]
- if exists {
- return id, true
- }
- if eit.nextAssignment == math.MaxUint16 {
- eit.nextAssignment = 0
- }
- for i, assigned := range eit.assignment[eit.nextAssignment:] {
- if !assigned {
- echoID := uint16(i) + eit.nextAssignment
- eit.set(key, echoID)
- return echoID, true
- }
- }
- for i, assigned := range eit.assignment[0:eit.nextAssignment] {
- if !assigned {
- echoID := uint16(i)
- eit.set(key, echoID)
- return echoID, true
- }
- }
- return 0, false
- }
- // Caller should hold the lock
- func (eit *echoIDTracker) set(key flow3Tuple, assignedEchoID uint16) {
- eit.assignment[assignedEchoID] = true
- eit.mapping[key] = assignedEchoID
- eit.nextAssignment = assignedEchoID + 1
- }
- func (eit *echoIDTracker) release(key flow3Tuple, assigned uint16) bool {
- eit.lock.Lock()
- defer eit.lock.Unlock()
- currentEchoID, exists := eit.mapping[key]
- if exists && assigned == currentEchoID {
- delete(eit.mapping, key)
- eit.assignment[assigned] = false
- return true
- }
- return false
- }
- type echoFunnelID uint16
- func (snf echoFunnelID) Type() string {
- return "echoID"
- }
- func (snf echoFunnelID) String() string {
- return strconv.FormatUint(uint64(snf), 10)
- }
- func newICMPProxy(listenIP netip.Addr, logger *zerolog.Logger, idleTimeout time.Duration) (*icmpProxy, error) {
- conn, err := newICMPConn(listenIP)
- if err != nil {
- return nil, err
- }
- logger.Info().Msgf("Created ICMP proxy listening on %s", conn.LocalAddr())
- return &icmpProxy{
- srcFunnelTracker: packet.NewFunnelTracker(),
- echoIDTracker: newEchoIDTracker(),
- encoder: packet.NewEncoder(),
- conn: conn,
- logger: logger,
- idleTimeout: idleTimeout,
- }, nil
- }
- func (ip *icmpProxy) Request(ctx context.Context, pk *packet.ICMP, responder ICMPResponder) error {
- _, span := responder.RequestSpan(ctx, pk)
- defer responder.ExportSpan()
- originalEcho, err := getICMPEcho(pk.Message)
- if err != nil {
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- observeICMPRequest(ip.logger, span, pk.Src.String(), pk.Dst.String(), originalEcho.ID, originalEcho.Seq)
- echoIDTrackerKey := flow3Tuple{
- srcIP: pk.Src,
- dstIP: pk.Dst,
- originalEchoID: originalEcho.ID,
- }
- assignedEchoID, success := ip.echoIDTracker.getOrAssign(echoIDTrackerKey)
- if !success {
- err := fmt.Errorf("failed to assign unique echo ID")
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- span.SetAttributes(attribute.Int("assignedEchoID", int(assignedEchoID)))
- shouldReplaceFunnelFunc := createShouldReplaceFunnelFunc(ip.logger, responder, pk, originalEcho.ID)
- newFunnelFunc := func() (packet.Funnel, error) {
- originalEcho, err := getICMPEcho(pk.Message)
- if err != nil {
- return nil, err
- }
- closeCallback := func() error {
- ip.echoIDTracker.release(echoIDTrackerKey, assignedEchoID)
- return nil
- }
- icmpFlow := newICMPEchoFlow(pk.Src, closeCallback, ip.conn, responder, int(assignedEchoID), originalEcho.ID, ip.encoder)
- return icmpFlow, nil
- }
- funnelID := echoFunnelID(assignedEchoID)
- funnel, isNew, err := ip.srcFunnelTracker.GetOrRegister(funnelID, shouldReplaceFunnelFunc, newFunnelFunc)
- if err != nil {
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- if isNew {
- span.SetAttributes(attribute.Bool("newFlow", true))
- ip.logger.Debug().
- Str("src", pk.Src.String()).
- Str("dst", pk.Dst.String()).
- Int("originalEchoID", originalEcho.ID).
- Int("assignedEchoID", int(assignedEchoID)).
- Msg("New flow")
- }
- icmpFlow, err := toICMPEchoFlow(funnel)
- if err != nil {
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- err = icmpFlow.sendToDst(pk.Dst, pk.Message)
- if err != nil {
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- tracing.End(span)
- return nil
- }
- // Serve listens for responses to the requests until context is done
- func (ip *icmpProxy) Serve(ctx context.Context) error {
- go func() {
- <-ctx.Done()
- ip.conn.Close()
- }()
- go func() {
- ip.srcFunnelTracker.ScheduleCleanup(ctx, ip.idleTimeout)
- }()
- buf := make([]byte, mtu)
- icmpDecoder := packet.NewICMPDecoder()
- for {
- n, from, err := ip.conn.ReadFrom(buf)
- if err != nil {
- return err
- }
- reply, err := parseReply(from, buf[:n])
- if err != nil {
- ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply, continue to parse as full packet")
- // In unit test, we found out when the listener listens on 0.0.0.0, the socket reads the full packet after
- // the second reply
- if err := ip.handleFullPacket(ctx, icmpDecoder, buf[:n]); err != nil {
- ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to parse ICMP reply as full packet")
- }
- continue
- }
- if !isEchoReply(reply.msg) {
- ip.logger.Debug().Str("dst", from.String()).Msgf("Drop ICMP %s from reply", reply.msg.Type)
- continue
- }
- if err := ip.sendReply(ctx, reply); err != nil {
- ip.logger.Debug().Err(err).Str("dst", from.String()).Msg("Failed to send ICMP reply")
- continue
- }
- }
- }
- func (ip *icmpProxy) handleFullPacket(ctx context.Context, decoder *packet.ICMPDecoder, rawPacket []byte) error {
- icmpPacket, err := decoder.Decode(packet.RawPacket{Data: rawPacket})
- if err != nil {
- return err
- }
- echo, err := getICMPEcho(icmpPacket.Message)
- if err != nil {
- return err
- }
- reply := echoReply{
- from: icmpPacket.Src,
- msg: icmpPacket.Message,
- echo: echo,
- }
- if ip.sendReply(ctx, &reply); err != nil {
- return err
- }
- return nil
- }
- func (ip *icmpProxy) sendReply(ctx context.Context, reply *echoReply) error {
- funnelID := echoFunnelID(reply.echo.ID)
- funnel, ok := ip.srcFunnelTracker.Get(funnelID)
- if !ok {
- return packet.ErrFunnelNotFound
- }
- icmpFlow, err := toICMPEchoFlow(funnel)
- if err != nil {
- return err
- }
- _, span := icmpFlow.responder.ReplySpan(ctx, ip.logger)
- defer icmpFlow.responder.ExportSpan()
- if err := icmpFlow.returnToSrc(reply); err != nil {
- tracing.EndWithErrorStatus(span, err)
- return err
- }
- observeICMPReply(ip.logger, span, reply.from.String(), reply.echo.ID, reply.echo.Seq)
- span.SetAttributes(attribute.Int("originalEchoID", icmpFlow.originalEchoID))
- tracing.End(span)
- return nil
- }
|