activestreammap_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package h2mux
  2. import (
  3. "sync"
  4. "testing"
  5. "github.com/stretchr/testify/assert"
  6. )
  7. func TestShutdown(t *testing.T) {
  8. const numStreams = 1000
  9. m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
  10. // Add all the streams
  11. {
  12. var wg sync.WaitGroup
  13. wg.Add(numStreams)
  14. for i := 0; i < numStreams; i++ {
  15. go func(streamID int) {
  16. defer wg.Done()
  17. stream := &MuxedStream{streamID: uint32(streamID)}
  18. ok := m.Set(stream)
  19. assert.True(t, ok)
  20. }(i)
  21. }
  22. wg.Wait()
  23. }
  24. assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
  25. shutdownChan, alreadyInProgress := m.Shutdown()
  26. select {
  27. case <-shutdownChan:
  28. assert.Fail(t, "before Shutdown(), shutdownChan shouldn't be closed")
  29. default:
  30. }
  31. assert.False(t, alreadyInProgress)
  32. shutdownChan2, alreadyInProgress2 := m.Shutdown()
  33. assert.Equal(t, shutdownChan, shutdownChan2, "repeated calls to Shutdown() should return the same channel")
  34. assert.True(t, alreadyInProgress2, "repeated calls to Shutdown() should return true for 'in progress'")
  35. // Delete all the streams
  36. {
  37. var wg sync.WaitGroup
  38. wg.Add(numStreams)
  39. for i := 0; i < numStreams; i++ {
  40. go func(streamID int) {
  41. defer wg.Done()
  42. m.Delete(uint32(streamID))
  43. }(i)
  44. }
  45. wg.Wait()
  46. }
  47. assert.Equal(t, 0, m.Len(), "All the streams should have been deleted")
  48. select {
  49. case <-shutdownChan:
  50. default:
  51. assert.Fail(t, "After all the streams are deleted, shutdownChan should have been closed")
  52. }
  53. }
  54. func TestEmptyBeforeShutdown(t *testing.T) {
  55. const numStreams = 1000
  56. m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
  57. // Add all the streams
  58. {
  59. var wg sync.WaitGroup
  60. wg.Add(numStreams)
  61. for i := 0; i < numStreams; i++ {
  62. go func(streamID int) {
  63. defer wg.Done()
  64. stream := &MuxedStream{streamID: uint32(streamID)}
  65. ok := m.Set(stream)
  66. assert.True(t, ok)
  67. }(i)
  68. }
  69. wg.Wait()
  70. }
  71. assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
  72. // Delete all the streams, bringing m to size 0
  73. {
  74. var wg sync.WaitGroup
  75. wg.Add(numStreams)
  76. for i := 0; i < numStreams; i++ {
  77. go func(streamID int) {
  78. defer wg.Done()
  79. m.Delete(uint32(streamID))
  80. }(i)
  81. }
  82. wg.Wait()
  83. }
  84. assert.Equal(t, 0, m.Len(), "All the streams should have been deleted")
  85. // Add one stream back
  86. const soloStreamID = uint32(0)
  87. ok := m.Set(&MuxedStream{streamID: soloStreamID})
  88. assert.True(t, ok)
  89. shutdownChan, alreadyInProgress := m.Shutdown()
  90. select {
  91. case <-shutdownChan:
  92. assert.Fail(t, "before Shutdown(), shutdownChan shouldn't be closed")
  93. default:
  94. }
  95. assert.False(t, alreadyInProgress)
  96. shutdownChan2, alreadyInProgress2 := m.Shutdown()
  97. assert.Equal(t, shutdownChan, shutdownChan2, "repeated calls to Shutdown() should return the same channel")
  98. assert.True(t, alreadyInProgress2, "repeated calls to Shutdown() should return true for 'in progress'")
  99. // Remove the remaining stream
  100. m.Delete(soloStreamID)
  101. select {
  102. case <-shutdownChan:
  103. default:
  104. assert.Fail(t, "After all the streams are deleted, shutdownChan should have been closed")
  105. }
  106. }
  107. type noopBuffer struct {
  108. isClosed bool
  109. }
  110. func (t *noopBuffer) Read(p []byte) (n int, err error) { return len(p), nil }
  111. func (t *noopBuffer) Write(p []byte) (n int, err error) { return len(p), nil }
  112. func (t *noopBuffer) Reset() {}
  113. func (t *noopBuffer) Len() int { return 0 }
  114. func (t *noopBuffer) Close() error { t.isClosed = true; return nil }
  115. func (t *noopBuffer) Closed() bool { return t.isClosed }
  116. type noopReadyList struct{}
  117. func (_ *noopReadyList) Signal(streamID uint32) {}
  118. func TestAbort(t *testing.T) {
  119. const numStreams = 1000
  120. m := newActiveStreamMap(true, NewActiveStreamsMetrics("test", t.Name()))
  121. var openedStreams sync.Map
  122. // Add all the streams
  123. {
  124. var wg sync.WaitGroup
  125. wg.Add(numStreams)
  126. for i := 0; i < numStreams; i++ {
  127. go func(streamID int) {
  128. defer wg.Done()
  129. stream := &MuxedStream{
  130. streamID: uint32(streamID),
  131. readBuffer: &noopBuffer{},
  132. writeBuffer: &noopBuffer{},
  133. readyList: &noopReadyList{},
  134. }
  135. ok := m.Set(stream)
  136. assert.True(t, ok)
  137. openedStreams.Store(stream.streamID, stream)
  138. }(i)
  139. }
  140. wg.Wait()
  141. }
  142. assert.Equal(t, numStreams, m.Len(), "All the streams should have been added")
  143. shutdownChan, alreadyInProgress := m.Shutdown()
  144. select {
  145. case <-shutdownChan:
  146. assert.Fail(t, "before Abort(), shutdownChan shouldn't be closed")
  147. default:
  148. }
  149. assert.False(t, alreadyInProgress)
  150. m.Abort()
  151. assert.Equal(t, numStreams, m.Len(), "Abort() shouldn't delete any streams")
  152. openedStreams.Range(func(key interface{}, value interface{}) bool {
  153. stream := value.(*MuxedStream)
  154. readBuffer := stream.readBuffer.(*noopBuffer)
  155. writeBuffer := stream.writeBuffer.(*noopBuffer)
  156. return assert.True(t, readBuffer.isClosed && writeBuffer.isClosed, "Abort() should have closed all the streams")
  157. })
  158. select {
  159. case <-shutdownChan:
  160. default:
  161. assert.Fail(t, "after Abort(), shutdownChan should have been closed")
  162. }
  163. // multiple aborts shouldn't cause any issues
  164. m.Abort()
  165. m.Abort()
  166. m.Abort()
  167. }