shared_buffer_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package h2mux
  2. import (
  3. "bytes"
  4. "io"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/stretchr/testify/assert"
  9. )
  10. func AssertIOReturnIsGood(t *testing.T, expected int) func(int, error) {
  11. return func(actual int, err error) {
  12. if expected != actual {
  13. t.Fatalf("Expected %d bytes, got %d", expected, actual)
  14. }
  15. if err != nil {
  16. t.Fatalf("Unexpected error %s", err)
  17. }
  18. }
  19. }
  20. func TestSharedBuffer(t *testing.T) {
  21. b := NewSharedBuffer()
  22. testData := []byte("Hello world")
  23. AssertIOReturnIsGood(t, len(testData))(b.Write(testData))
  24. bytesRead := make([]byte, len(testData))
  25. AssertIOReturnIsGood(t, len(testData))(b.Read(bytesRead))
  26. }
  27. func TestSharedBufferBlockingRead(t *testing.T) {
  28. b := NewSharedBuffer()
  29. testData1 := []byte("Hello")
  30. testData2 := []byte(" world")
  31. result := make(chan []byte)
  32. go func() {
  33. bytesRead := make([]byte, len(testData1)+len(testData2))
  34. nRead, err := b.Read(bytesRead)
  35. AssertIOReturnIsGood(t, len(testData1))(nRead, err)
  36. result <- bytesRead[:nRead]
  37. nRead, err = b.Read(bytesRead)
  38. AssertIOReturnIsGood(t, len(testData2))(nRead, err)
  39. result <- bytesRead[:nRead]
  40. }()
  41. time.Sleep(time.Millisecond * 250)
  42. select {
  43. case <-result:
  44. t.Fatalf("read returned early")
  45. default:
  46. }
  47. AssertIOReturnIsGood(t, len(testData1))(b.Write([]byte(testData1)))
  48. select {
  49. case r := <-result:
  50. assert.Equal(t, testData1, r)
  51. case <-time.After(time.Second):
  52. t.Fatalf("read timed out")
  53. }
  54. AssertIOReturnIsGood(t, len(testData2))(b.Write([]byte(testData2)))
  55. select {
  56. case r := <-result:
  57. assert.Equal(t, testData2, r)
  58. case <-time.After(time.Second):
  59. t.Fatalf("read timed out")
  60. }
  61. }
  62. // This is quite slow under the race detector
  63. func TestSharedBufferConcurrentReadWrite(t *testing.T) {
  64. b := NewSharedBuffer()
  65. var expectedResult, actualResult bytes.Buffer
  66. var wg sync.WaitGroup
  67. wg.Add(2)
  68. go func() {
  69. block := make([]byte, 256)
  70. for i := range block {
  71. block[i] = byte(i)
  72. }
  73. for blockSize := 1; blockSize <= 256; blockSize++ {
  74. for i := 0; i < 256; i++ {
  75. expectedResult.Write(block[:blockSize])
  76. n, err := b.Write(block[:blockSize])
  77. if n != blockSize || err != nil {
  78. t.Fatalf("write error: %d %s", n, err)
  79. }
  80. }
  81. }
  82. wg.Done()
  83. }()
  84. go func() {
  85. block := make([]byte, 256)
  86. // Change block sizes in opposition to the write thread, to test blocking for new data.
  87. for blockSize := 256; blockSize > 0; blockSize-- {
  88. for i := 0; i < 256; i++ {
  89. n, err := io.ReadFull(b, block[:blockSize])
  90. if n != blockSize || err != nil {
  91. t.Fatalf("read error: %d %s", n, err)
  92. }
  93. actualResult.Write(block[:blockSize])
  94. }
  95. }
  96. wg.Done()
  97. }()
  98. wg.Wait()
  99. if bytes.Compare(expectedResult.Bytes(), actualResult.Bytes()) != 0 {
  100. t.Fatal("Result diverged")
  101. }
  102. }
  103. func TestSharedBufferClose(t *testing.T) {
  104. b := NewSharedBuffer()
  105. testData := []byte("Hello world")
  106. AssertIOReturnIsGood(t, len(testData))(b.Write(testData))
  107. err := b.Close()
  108. if err != nil {
  109. t.Fatalf("unexpected error from Close: %s", err)
  110. }
  111. bytesRead := make([]byte, len(testData))
  112. AssertIOReturnIsGood(t, len(testData))(b.Read(bytesRead))
  113. n, err := b.Read(bytesRead)
  114. if n != 0 {
  115. t.Fatalf("extra bytes received: %d", n)
  116. }
  117. if err != io.EOF {
  118. t.Fatalf("expected EOF, got %s", err)
  119. }
  120. }