queuepacketconn_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. package turbotunnel
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/xtaci/kcp-go/v5"
  10. )
  11. type emptyAddr struct{}
  12. func (_ emptyAddr) Network() string { return "empty" }
  13. func (_ emptyAddr) String() string { return "empty" }
  14. type intAddr int
  15. func (i intAddr) Network() string { return "int" }
  16. func (i intAddr) String() string { return fmt.Sprintf("%d", i) }
  17. // Run with -benchmem to see memory allocations.
  18. func BenchmarkQueueIncoming(b *testing.B) {
  19. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
  20. defer conn.Close()
  21. b.ResetTimer()
  22. var p [500]byte
  23. for i := 0; i < b.N; i++ {
  24. conn.QueueIncoming(p[:], emptyAddr{})
  25. }
  26. b.StopTimer()
  27. }
  28. // BenchmarkWriteTo benchmarks the QueuePacketConn.WriteTo function.
  29. func BenchmarkWriteTo(b *testing.B) {
  30. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
  31. defer conn.Close()
  32. b.ResetTimer()
  33. var p [500]byte
  34. for i := 0; i < b.N; i++ {
  35. conn.WriteTo(p[:], emptyAddr{})
  36. }
  37. b.StopTimer()
  38. }
  39. // TestQueueIncomingOversize tests that QueueIncoming truncates packets that are
  40. // larger than the MTU.
  41. func TestQueueIncomingOversize(t *testing.T) {
  42. const payload = "abcdefghijklmnopqrstuvwxyz"
  43. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, len(payload)-1)
  44. defer conn.Close()
  45. conn.QueueIncoming([]byte(payload), emptyAddr{})
  46. var p [500]byte
  47. n, _, err := conn.ReadFrom(p[:])
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. if !bytes.Equal(p[:n], []byte(payload[:len(payload)-1])) {
  52. t.Fatalf("payload was %+q, expected %+q", p[:n], payload[:len(payload)-1])
  53. }
  54. }
  55. // TestWriteToOversize tests that WriteTo truncates packets that are larger than
  56. // the MTU.
  57. func TestWriteToOversize(t *testing.T) {
  58. const payload = "abcdefghijklmnopqrstuvwxyz"
  59. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, len(payload)-1)
  60. defer conn.Close()
  61. conn.WriteTo([]byte(payload), emptyAddr{})
  62. p := <-conn.OutgoingQueue(emptyAddr{})
  63. if !bytes.Equal(p, []byte(payload[:len(payload)-1])) {
  64. t.Fatalf("payload was %+q, expected %+q", p, payload[:len(payload)-1])
  65. }
  66. }
  67. // TestRestoreMTU tests that Restore ignores any inputs that are not at least
  68. // MTU-sized.
  69. func TestRestoreMTU(t *testing.T) {
  70. const mtu = 500
  71. const payload = "hello"
  72. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, mtu)
  73. defer conn.Close()
  74. conn.Restore(make([]byte, mtu-1))
  75. // This WriteTo may use the short slice we just gave to Restore.
  76. conn.WriteTo([]byte(payload), emptyAddr{})
  77. // Read the queued slice and ensure its capacity is at least the MTU.
  78. p := <-conn.OutgoingQueue(emptyAddr{})
  79. if cap(p) != mtu {
  80. t.Fatalf("cap was %v, expected %v", cap(p), mtu)
  81. }
  82. // Check the payload while we're at it.
  83. if !bytes.Equal(p, []byte(payload)) {
  84. t.Fatalf("payload was %+q, expected %+q", p, payload)
  85. }
  86. }
  87. // TestRestoreCap tests that Restore can use slices whose cap is at least the
  88. // MTU, even if the len is shorter.
  89. func TestRestoreCap(t *testing.T) {
  90. const mtu = 500
  91. const payload = "hello"
  92. conn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, mtu)
  93. defer conn.Close()
  94. conn.Restore(make([]byte, 0, mtu))
  95. conn.WriteTo([]byte(payload), emptyAddr{})
  96. p := <-conn.OutgoingQueue(emptyAddr{})
  97. if !bytes.Equal(p, []byte(payload)) {
  98. t.Fatalf("payload was %+q, expected %+q", p, payload)
  99. }
  100. }
  101. // DiscardPacketConn is a net.PacketConn whose ReadFrom method block forever and
  102. // whose WriteTo method discards whatever it is called with.
  103. type DiscardPacketConn struct{}
  104. func (_ DiscardPacketConn) ReadFrom(_ []byte) (int, net.Addr, error) { select {} } // block forever
  105. func (_ DiscardPacketConn) WriteTo(p []byte, _ net.Addr) (int, error) { return len(p), nil }
  106. func (_ DiscardPacketConn) Close() error { return nil }
  107. func (_ DiscardPacketConn) LocalAddr() net.Addr { return emptyAddr{} }
  108. func (_ DiscardPacketConn) SetDeadline(t time.Time) error { return nil }
  109. func (_ DiscardPacketConn) SetReadDeadline(t time.Time) error { return nil }
  110. func (_ DiscardPacketConn) SetWriteDeadline(t time.Time) error { return nil }
  111. // TranscriptPacketConn keeps a log of the []byte argument to every call to
  112. // WriteTo.
  113. type TranscriptPacketConn struct {
  114. Transcript [][]byte
  115. lock sync.Mutex
  116. net.PacketConn
  117. }
  118. func NewTranscriptPacketConn(inner net.PacketConn) *TranscriptPacketConn {
  119. return &TranscriptPacketConn{
  120. PacketConn: inner,
  121. }
  122. }
  123. func (c *TranscriptPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
  124. c.lock.Lock()
  125. defer c.lock.Unlock()
  126. p2 := make([]byte, len(p))
  127. copy(p2, p)
  128. c.Transcript = append(c.Transcript, p2)
  129. return c.PacketConn.WriteTo(p, addr)
  130. }
  131. // Tests that QueuePacketConn.WriteTo is compatible with the way kcp-go uses
  132. // PacketConn, allocating source buffers in a sync.Pool.
  133. //
  134. // https://bugs.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/40260
  135. func TestQueuePacketConnWriteToKCP(t *testing.T) {
  136. // Start a goroutine to constantly exercise kcp UDPSession.tx, writing
  137. // packets with payload "XXXX".
  138. done := make(chan struct{}, 0)
  139. defer close(done)
  140. ready := make(chan struct{}, 0)
  141. go func() {
  142. var readyClose sync.Once
  143. defer readyClose.Do(func() { close(ready) })
  144. pconn := DiscardPacketConn{}
  145. defer pconn.Close()
  146. loop:
  147. for {
  148. select {
  149. case <-done:
  150. break loop
  151. default:
  152. }
  153. // Create a new UDPSession, send once, then discard the
  154. // UDPSession.
  155. conn, err := kcp.NewConn2(intAddr(2), nil, 0, 0, pconn)
  156. if err != nil {
  157. panic(err)
  158. }
  159. _, err = conn.Write([]byte("XXXX"))
  160. if err != nil {
  161. panic(err)
  162. }
  163. conn.Close()
  164. // Signal the main test to start once we have done one
  165. // iterator of this noisy loop.
  166. readyClose.Do(func() { close(ready) })
  167. }
  168. }()
  169. pconn := NewQueuePacketConn(emptyAddr{}, 1*time.Hour, 500)
  170. defer pconn.Close()
  171. addr1 := intAddr(1)
  172. outgoing := pconn.OutgoingQueue(addr1)
  173. // Once the "XXXX" goroutine is started, repeatedly send a packet, wait,
  174. // then retrieve it and check whether it has changed since being sent.
  175. <-ready
  176. for i := 0; i < 10; i++ {
  177. transcript := NewTranscriptPacketConn(pconn)
  178. conn, err := kcp.NewConn2(addr1, nil, 0, 0, transcript)
  179. if err != nil {
  180. panic(err)
  181. }
  182. _, err = conn.Write([]byte("hello world"))
  183. if err != nil {
  184. panic(err)
  185. }
  186. err = conn.Close()
  187. if err != nil {
  188. panic(err)
  189. }
  190. // A sleep after the Write makes buffer reuse more likely.
  191. time.Sleep(100 * time.Millisecond)
  192. if len(transcript.Transcript) == 0 {
  193. panic("empty transcript")
  194. }
  195. for j, tr := range transcript.Transcript {
  196. p := <-outgoing
  197. // This test is meant to detect unsynchronized memory
  198. // changes, so freeze the slice we just read.
  199. p2 := make([]byte, len(p))
  200. copy(p2, p)
  201. if !bytes.Equal(p2, tr) {
  202. t.Fatalf("%d %d packet changed between send and recv\nsend: %+q\nrecv: %+q", i, j, tr, p2)
  203. }
  204. }
  205. }
  206. }