origin_icmp_proxy_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package ingress
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "net"
  7. "net/netip"
  8. "strings"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/fortytw2/leaktest"
  13. "github.com/google/gopacket/layers"
  14. "github.com/rs/zerolog"
  15. "github.com/stretchr/testify/require"
  16. "golang.org/x/net/icmp"
  17. "golang.org/x/net/ipv4"
  18. "golang.org/x/net/ipv6"
  19. "github.com/cloudflare/cloudflared/packet"
  20. quicpogs "github.com/cloudflare/cloudflared/quic"
  21. "github.com/cloudflare/cloudflared/tracing"
  22. )
  23. var (
  24. noopLogger = zerolog.Nop()
  25. localhostIP = netip.MustParseAddr("127.0.0.1")
  26. localhostIPv6 = netip.MustParseAddr("::1")
  27. testFunnelIdleTimeout = time.Millisecond * 10
  28. )
  29. // TestICMPProxyEcho makes sure we can send ICMP echo via the Request method and receives response via the
  30. // ListenResponse method
  31. //
  32. // Note: if this test fails on your device under Linux, then most likely you need to make sure that your user
  33. // is allowed in ping_group_range. See the following gist for how to do that:
  34. // https://github.com/ValentinBELYN/icmplib/blob/main/docs/6-use-icmplib-without-privileges.md
  35. func TestICMPRouterEcho(t *testing.T) {
  36. testICMPRouterEcho(t, true)
  37. testICMPRouterEcho(t, false)
  38. }
  39. func testICMPRouterEcho(t *testing.T, sendIPv4 bool) {
  40. defer leaktest.Check(t)()
  41. const (
  42. echoID = 36571
  43. endSeq = 20
  44. )
  45. router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout)
  46. require.NoError(t, err)
  47. proxyDone := make(chan struct{})
  48. ctx, cancel := context.WithCancel(context.Background())
  49. go func() {
  50. router.Serve(ctx)
  51. close(proxyDone)
  52. }()
  53. muxer := newMockMuxer(1)
  54. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  55. protocol := layers.IPProtocolICMPv6
  56. if sendIPv4 {
  57. protocol = layers.IPProtocolICMPv4
  58. }
  59. localIPs := getLocalIPs(t, sendIPv4)
  60. ips := make([]*packet.IP, len(localIPs))
  61. for i, localIP := range localIPs {
  62. ips[i] = &packet.IP{
  63. Src: localIP,
  64. Dst: localIP,
  65. Protocol: protocol,
  66. TTL: packet.DefaultTTL,
  67. }
  68. }
  69. var icmpType icmp.Type = ipv6.ICMPTypeEchoRequest
  70. if sendIPv4 {
  71. icmpType = ipv4.ICMPTypeEcho
  72. }
  73. for seq := 0; seq < endSeq; seq++ {
  74. for i, ip := range ips {
  75. pk := packet.ICMP{
  76. IP: ip,
  77. Message: &icmp.Message{
  78. Type: icmpType,
  79. Code: 0,
  80. Body: &icmp.Echo{
  81. ID: echoID + i,
  82. Seq: seq,
  83. Data: []byte(fmt.Sprintf("icmp echo seq %d", seq)),
  84. },
  85. },
  86. }
  87. require.NoError(t, router.Request(ctx, &pk, responder))
  88. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  89. }
  90. }
  91. // Make sure funnel cleanup kicks in
  92. time.Sleep(testFunnelIdleTimeout * 2)
  93. cancel()
  94. <-proxyDone
  95. }
  96. func TestTraceICMPRouterEcho(t *testing.T) {
  97. defer leaktest.Check(t)()
  98. tracingCtx := "ec31ad8a01fde11fdcabe2efdce36873:52726f6cabc144f5:0:1"
  99. router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout)
  100. require.NoError(t, err)
  101. proxyDone := make(chan struct{})
  102. ctx, cancel := context.WithCancel(context.Background())
  103. go func() {
  104. router.Serve(ctx)
  105. close(proxyDone)
  106. }()
  107. // Buffer 3 packets, request span, reply span and reply
  108. muxer := newMockMuxer(3)
  109. tracingIdentity, err := tracing.NewIdentity(tracingCtx)
  110. require.NoError(t, err)
  111. serializedIdentity, err := tracingIdentity.MarshalBinary()
  112. require.NoError(t, err)
  113. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  114. responder.AddTraceContext(tracing.NewTracedContext(ctx, tracingIdentity.String(), &noopLogger), serializedIdentity)
  115. echo := &icmp.Echo{
  116. ID: 12910,
  117. Seq: 182,
  118. Data: []byte(t.Name()),
  119. }
  120. pk := packet.ICMP{
  121. IP: &packet.IP{
  122. Src: localhostIP,
  123. Dst: localhostIP,
  124. Protocol: layers.IPProtocolICMPv4,
  125. TTL: packet.DefaultTTL,
  126. },
  127. Message: &icmp.Message{
  128. Type: ipv4.ICMPTypeEcho,
  129. Code: 0,
  130. Body: echo,
  131. },
  132. }
  133. require.NoError(t, router.Request(ctx, &pk, responder))
  134. firstPK := <-muxer.cfdToEdge
  135. var requestSpan *quicpogs.TracingSpanPacket
  136. // The order of receiving reply or request span is not deterministic
  137. switch firstPK.Type() {
  138. case quicpogs.DatagramTypeIP:
  139. // reply packet
  140. validateEchoFlow(t, firstPK, &pk)
  141. case quicpogs.DatagramTypeTracingSpan:
  142. // Request span
  143. requestSpan = firstPK.(*quicpogs.TracingSpanPacket)
  144. require.NotEmpty(t, requestSpan.Spans)
  145. require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
  146. default:
  147. panic(fmt.Sprintf("received unexpected packet type %d", firstPK.Type()))
  148. }
  149. secondPK := <-muxer.cfdToEdge
  150. if requestSpan != nil {
  151. // If first packet is request span, second packet should be the reply
  152. validateEchoFlow(t, secondPK, &pk)
  153. } else {
  154. requestSpan = secondPK.(*quicpogs.TracingSpanPacket)
  155. require.NotEmpty(t, requestSpan.Spans)
  156. require.True(t, bytes.Equal(serializedIdentity, requestSpan.TracingIdentity))
  157. }
  158. // Reply span
  159. thirdPacket := <-muxer.cfdToEdge
  160. replySpan, ok := thirdPacket.(*quicpogs.TracingSpanPacket)
  161. require.True(t, ok)
  162. require.NotEmpty(t, replySpan.Spans)
  163. require.True(t, bytes.Equal(serializedIdentity, replySpan.TracingIdentity))
  164. require.False(t, bytes.Equal(requestSpan.Spans, replySpan.Spans))
  165. echo.Seq++
  166. pk.Body = echo
  167. // Only first request for a flow is traced. The edge will not send tracing context for the second request
  168. newResponder := newPacketResponder(muxer, 0, packet.NewEncoder())
  169. require.NoError(t, router.Request(ctx, &pk, newResponder))
  170. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  171. select {
  172. case receivedPacket := <-muxer.cfdToEdge:
  173. panic(fmt.Sprintf("Receive unexpected packet %+v", receivedPacket))
  174. default:
  175. }
  176. time.Sleep(testFunnelIdleTimeout * 2)
  177. cancel()
  178. <-proxyDone
  179. }
  180. // TestConcurrentRequests makes sure icmpRouter can send concurrent requests to the same destination with different
  181. // echo ID. This simulates concurrent ping to the same destination.
  182. func TestConcurrentRequestsToSameDst(t *testing.T) {
  183. defer leaktest.Check(t)()
  184. const (
  185. concurrentPings = 5
  186. endSeq = 5
  187. )
  188. router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout)
  189. require.NoError(t, err)
  190. proxyDone := make(chan struct{})
  191. ctx, cancel := context.WithCancel(context.Background())
  192. go func() {
  193. router.Serve(ctx)
  194. close(proxyDone)
  195. }()
  196. var wg sync.WaitGroup
  197. // icmpv4 and icmpv6 each has concurrentPings
  198. wg.Add(concurrentPings * 2)
  199. for i := 0; i < concurrentPings; i++ {
  200. echoID := 38451 + i
  201. go func() {
  202. defer wg.Done()
  203. muxer := newMockMuxer(1)
  204. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  205. for seq := 0; seq < endSeq; seq++ {
  206. pk := &packet.ICMP{
  207. IP: &packet.IP{
  208. Src: localhostIP,
  209. Dst: localhostIP,
  210. Protocol: layers.IPProtocolICMPv4,
  211. TTL: packet.DefaultTTL,
  212. },
  213. Message: &icmp.Message{
  214. Type: ipv4.ICMPTypeEcho,
  215. Code: 0,
  216. Body: &icmp.Echo{
  217. ID: echoID,
  218. Seq: seq,
  219. Data: []byte(fmt.Sprintf("icmpv4 echo id %d, seq %d", echoID, seq)),
  220. },
  221. },
  222. }
  223. require.NoError(t, router.Request(ctx, pk, responder))
  224. validateEchoFlow(t, <-muxer.cfdToEdge, pk)
  225. }
  226. }()
  227. go func() {
  228. defer wg.Done()
  229. muxer := newMockMuxer(1)
  230. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  231. for seq := 0; seq < endSeq; seq++ {
  232. pk := &packet.ICMP{
  233. IP: &packet.IP{
  234. Src: localhostIPv6,
  235. Dst: localhostIPv6,
  236. Protocol: layers.IPProtocolICMPv6,
  237. TTL: packet.DefaultTTL,
  238. },
  239. Message: &icmp.Message{
  240. Type: ipv6.ICMPTypeEchoRequest,
  241. Code: 0,
  242. Body: &icmp.Echo{
  243. ID: echoID,
  244. Seq: seq,
  245. Data: []byte(fmt.Sprintf("icmpv6 echo id %d, seq %d", echoID, seq)),
  246. },
  247. },
  248. }
  249. require.NoError(t, router.Request(ctx, pk, responder))
  250. validateEchoFlow(t, <-muxer.cfdToEdge, pk)
  251. }
  252. }()
  253. }
  254. wg.Wait()
  255. time.Sleep(testFunnelIdleTimeout * 2)
  256. cancel()
  257. <-proxyDone
  258. }
  259. // TestICMPProxyRejectNotEcho makes sure it rejects messages other than echo
  260. func TestICMPRouterRejectNotEcho(t *testing.T) {
  261. defer leaktest.Check(t)()
  262. msgs := []icmp.Message{
  263. {
  264. Type: ipv4.ICMPTypeDestinationUnreachable,
  265. Code: 1,
  266. Body: &icmp.DstUnreach{
  267. Data: []byte("original packet"),
  268. },
  269. },
  270. {
  271. Type: ipv4.ICMPTypeTimeExceeded,
  272. Code: 1,
  273. Body: &icmp.TimeExceeded{
  274. Data: []byte("original packet"),
  275. },
  276. },
  277. {
  278. Type: ipv4.ICMPType(2),
  279. Code: 0,
  280. Body: &icmp.PacketTooBig{
  281. MTU: 1280,
  282. Data: []byte("original packet"),
  283. },
  284. },
  285. }
  286. testICMPRouterRejectNotEcho(t, localhostIP, msgs)
  287. msgsV6 := []icmp.Message{
  288. {
  289. Type: ipv6.ICMPTypeDestinationUnreachable,
  290. Code: 3,
  291. Body: &icmp.DstUnreach{
  292. Data: []byte("original packet"),
  293. },
  294. },
  295. {
  296. Type: ipv6.ICMPTypeTimeExceeded,
  297. Code: 0,
  298. Body: &icmp.TimeExceeded{
  299. Data: []byte("original packet"),
  300. },
  301. },
  302. {
  303. Type: ipv6.ICMPTypePacketTooBig,
  304. Code: 0,
  305. Body: &icmp.PacketTooBig{
  306. MTU: 1280,
  307. Data: []byte("original packet"),
  308. },
  309. },
  310. }
  311. testICMPRouterRejectNotEcho(t, localhostIPv6, msgsV6)
  312. }
  313. func testICMPRouterRejectNotEcho(t *testing.T, srcDstIP netip.Addr, msgs []icmp.Message) {
  314. router, err := NewICMPRouter(localhostIP, localhostIPv6, &noopLogger, testFunnelIdleTimeout)
  315. require.NoError(t, err)
  316. muxer := newMockMuxer(1)
  317. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  318. protocol := layers.IPProtocolICMPv4
  319. if srcDstIP.Is6() {
  320. protocol = layers.IPProtocolICMPv6
  321. }
  322. for _, m := range msgs {
  323. pk := packet.ICMP{
  324. IP: &packet.IP{
  325. Src: srcDstIP,
  326. Dst: srcDstIP,
  327. Protocol: protocol,
  328. TTL: packet.DefaultTTL,
  329. },
  330. Message: &m,
  331. }
  332. require.Error(t, router.Request(context.Background(), &pk, responder))
  333. }
  334. }
  335. func validateEchoFlow(t *testing.T, pk quicpogs.Packet, echoReq *packet.ICMP) {
  336. decoder := packet.NewICMPDecoder()
  337. decoded, err := decoder.Decode(packet.RawPacket{Data: pk.Payload()})
  338. require.NoError(t, err, pk)
  339. require.Equal(t, decoded.Src, echoReq.Dst)
  340. require.Equal(t, decoded.Dst, echoReq.Src)
  341. require.Equal(t, echoReq.Protocol, decoded.Protocol)
  342. if echoReq.Type == ipv4.ICMPTypeEcho {
  343. require.Equal(t, ipv4.ICMPTypeEchoReply, decoded.Type)
  344. } else {
  345. require.Equal(t, ipv6.ICMPTypeEchoReply, decoded.Type)
  346. }
  347. require.Equal(t, 0, decoded.Code)
  348. require.NotZero(t, decoded.Checksum)
  349. require.Equal(t, echoReq.Body, decoded.Body)
  350. }
  351. func getLocalIPs(t *testing.T, ipv4 bool) []netip.Addr {
  352. interfaces, err := net.Interfaces()
  353. require.NoError(t, err)
  354. localIPs := []netip.Addr{}
  355. for _, i := range interfaces {
  356. // Skip TUN devices, and Docker Networks
  357. if strings.Contains(i.Name, "tun") || strings.Contains(i.Name, "docker") || strings.HasPrefix(i.Name, "br-") {
  358. continue
  359. }
  360. addrs, err := i.Addrs()
  361. require.NoError(t, err)
  362. for _, addr := range addrs {
  363. if ipnet, ok := addr.(*net.IPNet); ok && (ipnet.IP.IsPrivate() || ipnet.IP.IsLoopback()) {
  364. // TODO DEVTOOLS-12514: We only run the IPv6 against the loopback interface due to issues on the CI runners.
  365. if (ipv4 && ipnet.IP.To4() != nil) || (!ipv4 && ipnet.IP.To4() == nil && ipnet.IP.IsLoopback()) {
  366. localIPs = append(localIPs, netip.MustParseAddr(ipnet.IP.String()))
  367. }
  368. }
  369. }
  370. }
  371. return localIPs
  372. }