icmp_posix_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. //go:build darwin || linux
  2. package ingress
  3. import (
  4. "context"
  5. "os"
  6. "testing"
  7. "time"
  8. "github.com/fortytw2/leaktest"
  9. "github.com/google/gopacket/layers"
  10. "github.com/rs/zerolog"
  11. "github.com/stretchr/testify/require"
  12. "golang.org/x/net/icmp"
  13. "golang.org/x/net/ipv4"
  14. "github.com/cloudflare/cloudflared/packet"
  15. )
  16. func TestFunnelIdleTimeout(t *testing.T) {
  17. defer leaktest.Check(t)()
  18. const (
  19. idleTimeout = time.Second
  20. echoID = 42573
  21. startSeq = 8129
  22. )
  23. logger := zerolog.New(os.Stderr)
  24. proxy, err := newICMPProxy(localhostIP, &logger, idleTimeout)
  25. require.NoError(t, err)
  26. ctx, cancel := context.WithCancel(context.Background())
  27. proxyDone := make(chan struct{})
  28. go func() {
  29. proxy.Serve(ctx)
  30. close(proxyDone)
  31. }()
  32. // Send a packet to register the flow
  33. pk := packet.ICMP{
  34. IP: &packet.IP{
  35. Src: localhostIP,
  36. Dst: localhostIP,
  37. Protocol: layers.IPProtocolICMPv4,
  38. },
  39. Message: &icmp.Message{
  40. Type: ipv4.ICMPTypeEcho,
  41. Code: 0,
  42. Body: &icmp.Echo{
  43. ID: echoID,
  44. Seq: startSeq,
  45. Data: []byte(t.Name()),
  46. },
  47. },
  48. }
  49. muxer := newMockMuxer(0)
  50. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  51. require.NoError(t, proxy.Request(ctx, &pk, responder))
  52. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  53. // Send second request, should reuse the funnel
  54. require.NoError(t, proxy.Request(ctx, &pk, responder))
  55. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  56. // New muxer on a different connection should use a new flow
  57. time.Sleep(idleTimeout * 2)
  58. newMuxer := newMockMuxer(0)
  59. newResponder := newPacketResponder(newMuxer, 1, packet.NewEncoder())
  60. require.NoError(t, proxy.Request(ctx, &pk, newResponder))
  61. validateEchoFlow(t, <-newMuxer.cfdToEdge, &pk)
  62. time.Sleep(idleTimeout * 2)
  63. cancel()
  64. <-proxyDone
  65. }
  66. func TestReuseFunnel(t *testing.T) {
  67. defer leaktest.Check(t)()
  68. const (
  69. idleTimeout = time.Millisecond * 100
  70. echoID = 42573
  71. startSeq = 8129
  72. )
  73. logger := zerolog.New(os.Stderr)
  74. proxy, err := newICMPProxy(localhostIP, &logger, idleTimeout)
  75. require.NoError(t, err)
  76. ctx, cancel := context.WithCancel(context.Background())
  77. proxyDone := make(chan struct{})
  78. go func() {
  79. proxy.Serve(ctx)
  80. close(proxyDone)
  81. }()
  82. // Send a packet to register the flow
  83. pk := packet.ICMP{
  84. IP: &packet.IP{
  85. Src: localhostIP,
  86. Dst: localhostIP,
  87. Protocol: layers.IPProtocolICMPv4,
  88. },
  89. Message: &icmp.Message{
  90. Type: ipv4.ICMPTypeEcho,
  91. Code: 0,
  92. Body: &icmp.Echo{
  93. ID: echoID,
  94. Seq: startSeq,
  95. Data: []byte(t.Name()),
  96. },
  97. },
  98. }
  99. tuple := flow3Tuple{
  100. srcIP: pk.Src,
  101. dstIP: pk.Dst,
  102. originalEchoID: echoID,
  103. }
  104. muxer := newMockMuxer(0)
  105. responder := newPacketResponder(muxer, 0, packet.NewEncoder())
  106. require.NoError(t, proxy.Request(ctx, &pk, responder))
  107. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  108. funnel1, found := getFunnel(t, proxy, tuple)
  109. require.True(t, found)
  110. // Send second request, should reuse the funnel
  111. require.NoError(t, proxy.Request(ctx, &pk, responder))
  112. validateEchoFlow(t, <-muxer.cfdToEdge, &pk)
  113. funnel2, found := getFunnel(t, proxy, tuple)
  114. require.True(t, found)
  115. require.Equal(t, funnel1, funnel2)
  116. time.Sleep(idleTimeout * 2)
  117. cancel()
  118. <-proxyDone
  119. }