muxmetrics_test.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package h2mux
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/cloudflare/cloudflared/logger"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. func ave(sum uint64, len int) float64 {
  10. return float64(sum) / float64(len)
  11. }
  12. func TestRTTUpdate(t *testing.T) {
  13. r := newRTTData()
  14. start := time.Now()
  15. // send at 0 ms, receive at 2 ms, RTT = 2ms
  16. m := &roundTripMeasurement{receiveTime: start.Add(2 * time.Millisecond), sendTime: start}
  17. r.update(m)
  18. assert.Equal(t, start, r.lastMeasurementTime)
  19. assert.Equal(t, 2*time.Millisecond, r.rtt)
  20. assert.Equal(t, 2*time.Millisecond, r.rttMin)
  21. assert.Equal(t, 2*time.Millisecond, r.rttMax)
  22. // send at 3 ms, receive at 6 ms, RTT = 3ms
  23. m = &roundTripMeasurement{receiveTime: start.Add(6 * time.Millisecond), sendTime: start.Add(3 * time.Millisecond)}
  24. r.update(m)
  25. assert.Equal(t, start.Add(3*time.Millisecond), r.lastMeasurementTime)
  26. assert.Equal(t, 3*time.Millisecond, r.rtt)
  27. assert.Equal(t, 2*time.Millisecond, r.rttMin)
  28. assert.Equal(t, 3*time.Millisecond, r.rttMax)
  29. // send at 7 ms, receive at 8 ms, RTT = 1ms
  30. m = &roundTripMeasurement{receiveTime: start.Add(8 * time.Millisecond), sendTime: start.Add(7 * time.Millisecond)}
  31. r.update(m)
  32. assert.Equal(t, start.Add(7*time.Millisecond), r.lastMeasurementTime)
  33. assert.Equal(t, 1*time.Millisecond, r.rtt)
  34. assert.Equal(t, 1*time.Millisecond, r.rttMin)
  35. assert.Equal(t, 3*time.Millisecond, r.rttMax)
  36. // send at -4 ms, receive at 0 ms, RTT = 4ms, but this ping is before last measurement
  37. // so it will be discarded
  38. m = &roundTripMeasurement{receiveTime: start, sendTime: start.Add(-2 * time.Millisecond)}
  39. r.update(m)
  40. assert.Equal(t, start.Add(7*time.Millisecond), r.lastMeasurementTime)
  41. assert.Equal(t, 1*time.Millisecond, r.rtt)
  42. assert.Equal(t, 1*time.Millisecond, r.rttMin)
  43. assert.Equal(t, 3*time.Millisecond, r.rttMax)
  44. }
  45. func TestFlowControlDataUpdate(t *testing.T) {
  46. f := newFlowControlData()
  47. assert.Equal(t, 0, f.queue.Len())
  48. assert.Equal(t, float64(0), f.ave())
  49. var sum uint64
  50. min := maxWindowSize - dataPoints
  51. max := maxWindowSize
  52. for i := 1; i <= dataPoints; i++ {
  53. size := maxWindowSize - uint32(i)
  54. f.update(size)
  55. assert.Equal(t, max-uint32(1), f.max)
  56. assert.Equal(t, size, f.min)
  57. assert.Equal(t, i, f.queue.Len())
  58. sum += uint64(size)
  59. assert.Equal(t, sum, f.sum)
  60. assert.Equal(t, ave(sum, f.queue.Len()), f.ave())
  61. }
  62. // queue is full, should start to dequeue first element
  63. for i := 1; i <= dataPoints; i++ {
  64. f.update(max)
  65. assert.Equal(t, max, f.max)
  66. assert.Equal(t, min, f.min)
  67. assert.Equal(t, dataPoints, f.queue.Len())
  68. sum += uint64(i)
  69. assert.Equal(t, sum, f.sum)
  70. assert.Equal(t, ave(sum, dataPoints), f.ave())
  71. }
  72. }
  73. func TestMuxMetricsUpdater(t *testing.T) {
  74. t.Skip("Inherently racy test due to muxMetricsUpdaterImpl.run()")
  75. errChan := make(chan error)
  76. abortChan := make(chan struct{})
  77. compBefore, compAfter := NewAtomicCounter(0), NewAtomicCounter(0)
  78. m := newMuxMetricsUpdater(abortChan, compBefore, compAfter)
  79. logger := logger.NewOutputWriter(logger.NewMockWriteManager())
  80. go func() {
  81. errChan <- m.run(logger)
  82. }()
  83. var wg sync.WaitGroup
  84. wg.Add(2)
  85. // mock muxReader
  86. readerStart := time.Now()
  87. rm := &roundTripMeasurement{receiveTime: readerStart, sendTime: readerStart}
  88. m.updateRTT(rm)
  89. go func() {
  90. defer wg.Done()
  91. assert.Equal(t, 0, dataPoints%4,
  92. "dataPoints is not divisible by 4; this test should be adjusted accordingly")
  93. readerSend := readerStart.Add(time.Millisecond)
  94. for i := 1; i <= dataPoints/4; i++ {
  95. readerReceive := readerSend.Add(time.Duration(i) * time.Millisecond)
  96. rm := &roundTripMeasurement{receiveTime: readerReceive, sendTime: readerSend}
  97. m.updateRTT(rm)
  98. readerSend = readerReceive.Add(time.Millisecond)
  99. m.updateReceiveWindow(uint32(i))
  100. m.updateSendWindow(uint32(i))
  101. m.updateInBoundBytes(uint64(i))
  102. }
  103. }()
  104. // mock muxWriter
  105. go func() {
  106. defer wg.Done()
  107. assert.Equal(t, 0, dataPoints%4,
  108. "dataPoints is not divisible by 4; this test should be adjusted accordingly")
  109. for j := dataPoints/4 + 1; j <= dataPoints/2; j++ {
  110. m.updateReceiveWindow(uint32(j))
  111. m.updateSendWindow(uint32(j))
  112. // should always be disgarded since the send time is before readerSend
  113. rm := &roundTripMeasurement{receiveTime: readerStart, sendTime: readerStart.Add(-time.Duration(j*dataPoints) * time.Millisecond)}
  114. m.updateRTT(rm)
  115. m.updateOutBoundBytes(uint64(j))
  116. }
  117. }()
  118. wg.Wait()
  119. metrics := m.metrics()
  120. points := dataPoints / 2
  121. assert.Equal(t, time.Millisecond, metrics.RTTMin)
  122. assert.Equal(t, time.Duration(dataPoints/4)*time.Millisecond, metrics.RTTMax)
  123. // sum(1..i) = i*(i+1)/2, ave(1..i) = i*(i+1)/2/i = (i+1)/2
  124. assert.Equal(t, float64(points+1)/float64(2), metrics.ReceiveWindowAve)
  125. assert.Equal(t, uint32(1), metrics.ReceiveWindowMin)
  126. assert.Equal(t, uint32(points), metrics.ReceiveWindowMax)
  127. assert.Equal(t, float64(points+1)/float64(2), metrics.SendWindowAve)
  128. assert.Equal(t, uint32(1), metrics.SendWindowMin)
  129. assert.Equal(t, uint32(points), metrics.SendWindowMax)
  130. assert.Equal(t, uint64(dataPoints/4), metrics.InBoundRateCurr)
  131. assert.Equal(t, uint64(1), metrics.InBoundRateMin)
  132. assert.Equal(t, uint64(dataPoints/4), metrics.InBoundRateMax)
  133. assert.Equal(t, uint64(dataPoints/2), metrics.OutBoundRateCurr)
  134. assert.Equal(t, uint64(dataPoints/4+1), metrics.OutBoundRateMin)
  135. assert.Equal(t, uint64(dataPoints/2), metrics.OutBoundRateMax)
  136. close(abortChan)
  137. assert.Nil(t, <-errChan)
  138. close(errChan)
  139. }