h2mux_test.go 33 KB


  1. package h2mux
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "testing"
  15. "time"
  16. "github.com/pkg/errors"
  17. "github.com/stretchr/testify/assert"
  18. "golang.org/x/sync/errgroup"
  19. "github.com/cloudflare/cloudflared/logger"
  20. )
  21. const (
  22. testOpenStreamTimeout = time.Millisecond * 5000
  23. testHandshakeTimeout = time.Millisecond * 1000
  24. )
  25. func TestMain(m *testing.M) {
  26. if os.Getenv("VERBOSE") == "1" {
  27. //TODO: set log level
  28. }
  29. os.Exit(m.Run())
  30. }
  31. type DefaultMuxerPair struct {
  32. OriginMuxConfig MuxerConfig
  33. OriginMux *Muxer
  34. OriginConn net.Conn
  35. EdgeMuxConfig MuxerConfig
  36. EdgeMux *Muxer
  37. EdgeConn net.Conn
  38. doneC chan struct{}
  39. }
  40. func NewDefaultMuxerPair(t assert.TestingT, testName string, f MuxedStreamFunc) *DefaultMuxerPair {
  41. origin, edge := net.Pipe()
  42. p := &DefaultMuxerPair{
  43. OriginMuxConfig: MuxerConfig{
  44. Timeout: testHandshakeTimeout,
  45. Handler: f,
  46. IsClient: true,
  47. Name: "origin",
  48. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  49. DefaultWindowSize: (1 << 8) - 1,
  50. MaxWindowSize: (1 << 15) - 1,
  51. StreamWriteBufferMaxLen: 1024,
  52. HeartbeatInterval: defaultTimeout,
  53. MaxHeartbeats: defaultRetries,
  54. },
  55. OriginConn: origin,
  56. EdgeMuxConfig: MuxerConfig{
  57. Timeout: testHandshakeTimeout,
  58. IsClient: false,
  59. Name: "edge",
  60. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  61. DefaultWindowSize: (1 << 8) - 1,
  62. MaxWindowSize: (1 << 15) - 1,
  63. StreamWriteBufferMaxLen: 1024,
  64. HeartbeatInterval: defaultTimeout,
  65. MaxHeartbeats: defaultRetries,
  66. },
  67. EdgeConn: edge,
  68. doneC: make(chan struct{}),
  69. }
  70. assert.NoError(t, p.Handshake(testName))
  71. return p
  72. }
  73. func NewCompressedMuxerPair(t assert.TestingT, testName string, quality CompressionSetting, f MuxedStreamFunc) *DefaultMuxerPair {
  74. origin, edge := net.Pipe()
  75. p := &DefaultMuxerPair{
  76. OriginMuxConfig: MuxerConfig{
  77. Timeout: time.Second,
  78. Handler: f,
  79. IsClient: true,
  80. Name: "origin",
  81. CompressionQuality: quality,
  82. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  83. HeartbeatInterval: defaultTimeout,
  84. MaxHeartbeats: defaultRetries,
  85. },
  86. OriginConn: origin,
  87. EdgeMuxConfig: MuxerConfig{
  88. Timeout: time.Second,
  89. IsClient: false,
  90. Name: "edge",
  91. CompressionQuality: quality,
  92. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  93. HeartbeatInterval: defaultTimeout,
  94. MaxHeartbeats: defaultRetries,
  95. },
  96. EdgeConn: edge,
  97. doneC: make(chan struct{}),
  98. }
  99. assert.NoError(t, p.Handshake(testName))
  100. return p
  101. }
  102. func (p *DefaultMuxerPair) Handshake(testName string) error {
  103. ctx, cancel := context.WithTimeout(context.Background(), testHandshakeTimeout)
  104. defer cancel()
  105. errGroup, _ := errgroup.WithContext(ctx)
  106. errGroup.Go(func() (err error) {
  107. p.EdgeMux, err = Handshake(p.EdgeConn, p.EdgeConn, p.EdgeMuxConfig, NewActiveStreamsMetrics(testName, "edge"))
  108. return errors.Wrap(err, "edge handshake failure")
  109. })
  110. errGroup.Go(func() (err error) {
  111. p.OriginMux, err = Handshake(p.OriginConn, p.OriginConn, p.OriginMuxConfig, NewActiveStreamsMetrics(testName, "origin"))
  112. return errors.Wrap(err, "origin handshake failure")
  113. })
  114. return errGroup.Wait()
  115. }
  116. func (p *DefaultMuxerPair) Serve(t assert.TestingT) {
  117. ctx := context.Background()
  118. var wg sync.WaitGroup
  119. wg.Add(2)
  120. go func() {
  121. err := p.EdgeMux.Serve(ctx)
  122. if err != nil && err != io.EOF && err != io.ErrClosedPipe {
  123. t.Errorf("error in edge muxer Serve(): %s", err)
  124. }
  125. p.OriginMux.Shutdown()
  126. wg.Done()
  127. }()
  128. go func() {
  129. err := p.OriginMux.Serve(ctx)
  130. if err != nil && err != io.EOF && err != io.ErrClosedPipe {
  131. t.Errorf("error in origin muxer Serve(): %s", err)
  132. }
  133. p.EdgeMux.Shutdown()
  134. wg.Done()
  135. }()
  136. go func() {
  137. // notify when both muxes have stopped serving
  138. wg.Wait()
  139. close(p.doneC)
  140. }()
  141. }
  142. func (p *DefaultMuxerPair) Wait(t *testing.T) {
  143. select {
  144. case <-p.doneC:
  145. return
  146. case <-time.After(5 * time.Second):
  147. t.Fatal("timeout waiting for shutdown")
  148. }
  149. }
  150. func (p *DefaultMuxerPair) OpenEdgeMuxStream(headers []Header, body io.Reader) (*MuxedStream, error) {
  151. ctx, cancel := context.WithTimeout(context.Background(), testOpenStreamTimeout)
  152. defer cancel()
  153. return p.EdgeMux.OpenStream(ctx, headers, body)
  154. }
  155. func TestHandshake(t *testing.T) {
  156. f := func(stream *MuxedStream) error {
  157. return nil
  158. }
  159. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  160. AssertIfPipeReadable(t, muxPair.OriginConn)
  161. AssertIfPipeReadable(t, muxPair.EdgeConn)
  162. }
  163. func TestSingleStream(t *testing.T) {
  164. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  165. if len(stream.Headers) != 1 {
  166. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  167. }
  168. if stream.Headers[0].Name != "test-header" {
  169. t.Fatalf("expected header name %s, got %s", "test-header", stream.Headers[0].Name)
  170. }
  171. if stream.Headers[0].Value != "headerValue" {
  172. t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
  173. }
  174. stream.WriteHeaders([]Header{
  175. {Name: "response-header", Value: "responseValue"},
  176. })
  177. buf := []byte("Hello world")
  178. stream.Write(buf)
  179. n, err := io.ReadFull(stream, buf)
  180. if n > 0 {
  181. t.Fatalf("read %d bytes after EOF", n)
  182. }
  183. if err != io.EOF {
  184. t.Fatalf("expected EOF, got %s", err)
  185. }
  186. return nil
  187. })
  188. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  189. muxPair.Serve(t)
  190. stream, err := muxPair.OpenEdgeMuxStream(
  191. []Header{{Name: "test-header", Value: "headerValue"}},
  192. nil,
  193. )
  194. if err != nil {
  195. t.Fatalf("error in OpenStream: %s", err)
  196. }
  197. if len(stream.Headers) != 1 {
  198. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  199. }
  200. if stream.Headers[0].Name != "response-header" {
  201. t.Fatalf("expected header name %s, got %s", "response-header", stream.Headers[0].Name)
  202. }
  203. if stream.Headers[0].Value != "responseValue" {
  204. t.Fatalf("expected header value %s, got %s", "responseValue", stream.Headers[0].Value)
  205. }
  206. responseBody := make([]byte, 11)
  207. n, err := io.ReadFull(stream, responseBody)
  208. if err != nil {
  209. t.Fatalf("error from (*MuxedStream).Read: %s", err)
  210. }
  211. if n != len(responseBody) {
  212. t.Fatalf("expected response body to have %d bytes, got %d", len(responseBody), n)
  213. }
  214. if string(responseBody) != "Hello world" {
  215. t.Fatalf("expected response body %s, got %s", "Hello world", responseBody)
  216. }
  217. stream.Close()
  218. n, err = stream.Write([]byte("aaaaa"))
  219. if n > 0 {
  220. t.Fatalf("wrote %d bytes after EOF", n)
  221. }
  222. if err != io.EOF {
  223. t.Fatalf("expected EOF, got %s", err)
  224. }
  225. }
  226. func TestSingleStreamLargeResponseBody(t *testing.T) {
  227. bodySize := 1 << 24
  228. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  229. if len(stream.Headers) != 1 {
  230. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  231. }
  232. if stream.Headers[0].Name != "test-header" {
  233. t.Fatalf("expected header name %s, got %s", "test-header", stream.Headers[0].Name)
  234. }
  235. if stream.Headers[0].Value != "headerValue" {
  236. t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
  237. }
  238. stream.WriteHeaders([]Header{
  239. {Name: "response-header", Value: "responseValue"},
  240. })
  241. payload := make([]byte, bodySize)
  242. for i := range payload {
  243. payload[i] = byte(i % 256)
  244. }
  245. t.Log("Writing payload...")
  246. n, err := stream.Write(payload)
  247. t.Logf("Wrote %d bytes into the stream", n)
  248. if err != nil {
  249. t.Fatalf("origin write error: %s", err)
  250. }
  251. if n != len(payload) {
  252. t.Fatalf("origin short write: %d/%d bytes", n, len(payload))
  253. }
  254. return nil
  255. })
  256. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  257. muxPair.Serve(t)
  258. stream, err := muxPair.OpenEdgeMuxStream(
  259. []Header{{Name: "test-header", Value: "headerValue"}},
  260. nil,
  261. )
  262. if err != nil {
  263. t.Fatalf("error in OpenStream: %s", err)
  264. }
  265. if len(stream.Headers) != 1 {
  266. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  267. }
  268. if stream.Headers[0].Name != "response-header" {
  269. t.Fatalf("expected header name %s, got %s", "response-header", stream.Headers[0].Name)
  270. }
  271. if stream.Headers[0].Value != "responseValue" {
  272. t.Fatalf("expected header value %s, got %s", "responseValue", stream.Headers[0].Value)
  273. }
  274. responseBody := make([]byte, bodySize)
  275. n, err := io.ReadFull(stream, responseBody)
  276. if err != nil {
  277. t.Fatalf("error from (*MuxedStream).Read: %s", err)
  278. }
  279. if n != len(responseBody) {
  280. t.Fatalf("expected response body to have %d bytes, got %d", len(responseBody), n)
  281. }
  282. }
  283. func TestMultipleStreams(t *testing.T) {
  284. l := logger.NewOutputWriter(logger.NewMockWriteManager())
  285. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  286. if len(stream.Headers) != 1 {
  287. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  288. }
  289. if stream.Headers[0].Name != "client-token" {
  290. t.Fatalf("expected header name %s, got %s", "client-token", stream.Headers[0].Name)
  291. }
  292. l.Debugf("Got request for stream %s", stream.Headers[0].Value)
  293. stream.WriteHeaders([]Header{
  294. {Name: "response-token", Value: stream.Headers[0].Value},
  295. })
  296. l.Debugf("Wrote headers for stream %s", stream.Headers[0].Value)
  297. stream.Write([]byte("OK"))
  298. l.Debugf("Wrote body for stream %s", stream.Headers[0].Value)
  299. return nil
  300. })
  301. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  302. muxPair.Serve(t)
  303. maxStreams := 64
  304. errorsC := make(chan error, maxStreams)
  305. var wg sync.WaitGroup
  306. wg.Add(maxStreams)
  307. for i := 0; i < maxStreams; i++ {
  308. go func(tokenId int) {
  309. defer wg.Done()
  310. tokenString := fmt.Sprintf("%d", tokenId)
  311. stream, err := muxPair.OpenEdgeMuxStream(
  312. []Header{{Name: "client-token", Value: tokenString}},
  313. nil,
  314. )
  315. l.Debugf("Got headers for stream %d", tokenId)
  316. if err != nil {
  317. errorsC <- err
  318. return
  319. }
  320. if len(stream.Headers) != 1 {
  321. errorsC <- fmt.Errorf("stream %d has error: expected %d headers, got %d", stream.streamID, 1, len(stream.Headers))
  322. return
  323. }
  324. if stream.Headers[0].Name != "response-token" {
  325. errorsC <- fmt.Errorf("stream %d has error: expected header name %s, got %s", stream.streamID, "response-token", stream.Headers[0].Name)
  326. return
  327. }
  328. if stream.Headers[0].Value != tokenString {
  329. errorsC <- fmt.Errorf("stream %d has error: expected header value %s, got %s", stream.streamID, tokenString, stream.Headers[0].Value)
  330. return
  331. }
  332. responseBody := make([]byte, 2)
  333. n, err := io.ReadFull(stream, responseBody)
  334. if err != nil {
  335. errorsC <- fmt.Errorf("stream %d has error: error from (*MuxedStream).Read: %s", stream.streamID, err)
  336. return
  337. }
  338. if n != len(responseBody) {
  339. errorsC <- fmt.Errorf("stream %d has error: expected response body to have %d bytes, got %d", stream.streamID, len(responseBody), n)
  340. return
  341. }
  342. if string(responseBody) != "OK" {
  343. errorsC <- fmt.Errorf("stream %d has error: expected response body %s, got %s", stream.streamID, "OK", responseBody)
  344. return
  345. }
  346. }(i)
  347. }
  348. wg.Wait()
  349. close(errorsC)
  350. testFail := false
  351. for err := range errorsC {
  352. testFail = true
  353. l.Errorf("%s", err)
  354. }
  355. if testFail {
  356. t.Fatalf("TestMultipleStreams failed")
  357. }
  358. }
  359. func TestMultipleStreamsFlowControl(t *testing.T) {
  360. maxStreams := 32
  361. responseSizes := make([]int32, maxStreams)
  362. for i := 0; i < maxStreams; i++ {
  363. responseSizes[i] = rand.Int31n(int32(defaultWindowSize << 4))
  364. }
  365. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  366. if len(stream.Headers) != 1 {
  367. t.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  368. }
  369. if stream.Headers[0].Name != "test-header" {
  370. t.Fatalf("expected header name %s, got %s", "test-header", stream.Headers[0].Name)
  371. }
  372. if stream.Headers[0].Value != "headerValue" {
  373. t.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
  374. }
  375. stream.WriteHeaders([]Header{
  376. {Name: "response-header", Value: "responseValue"},
  377. })
  378. payload := make([]byte, responseSizes[(stream.streamID-2)/2])
  379. for i := range payload {
  380. payload[i] = byte(i % 256)
  381. }
  382. n, err := stream.Write(payload)
  383. if err != nil {
  384. t.Fatalf("origin write error: %s", err)
  385. }
  386. if n != len(payload) {
  387. t.Fatalf("origin short write: %d/%d bytes", n, len(payload))
  388. }
  389. return nil
  390. })
  391. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  392. muxPair.Serve(t)
  393. errGroup, _ := errgroup.WithContext(context.Background())
  394. for i := 0; i < maxStreams; i++ {
  395. errGroup.Go(func() error {
  396. stream, err := muxPair.OpenEdgeMuxStream(
  397. []Header{{Name: "test-header", Value: "headerValue"}},
  398. nil,
  399. )
  400. if err != nil {
  401. return fmt.Errorf("error in OpenStream: %d %s", stream.streamID, err)
  402. }
  403. if len(stream.Headers) != 1 {
  404. return fmt.Errorf("stream %d expected %d headers, got %d", stream.streamID, 1, len(stream.Headers))
  405. }
  406. if stream.Headers[0].Name != "response-header" {
  407. return fmt.Errorf("stream %d expected header name %s, got %s", stream.streamID, "response-header", stream.Headers[0].Name)
  408. }
  409. if stream.Headers[0].Value != "responseValue" {
  410. return fmt.Errorf("stream %d expected header value %s, got %s", stream.streamID, "responseValue", stream.Headers[0].Value)
  411. }
  412. responseBody := make([]byte, responseSizes[(stream.streamID-2)/2])
  413. n, err := io.ReadFull(stream, responseBody)
  414. if err != nil {
  415. return fmt.Errorf("stream %d error from (*MuxedStream).Read: %s", stream.streamID, err)
  416. }
  417. if n != len(responseBody) {
  418. return fmt.Errorf("stream %d expected response body to have %d bytes, got %d", stream.streamID, len(responseBody), n)
  419. }
  420. return nil
  421. })
  422. }
  423. assert.NoError(t, errGroup.Wait())
  424. }
  425. func TestGracefulShutdown(t *testing.T) {
  426. l := logger.NewOutputWriter(logger.NewMockWriteManager())
  427. sendC := make(chan struct{})
  428. responseBuf := bytes.Repeat([]byte("Hello world"), 65536)
  429. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  430. stream.WriteHeaders([]Header{
  431. {Name: "response-header", Value: "responseValue"},
  432. })
  433. <-sendC
  434. l.Debugf("Writing %d bytes", len(responseBuf))
  435. stream.Write(responseBuf)
  436. stream.CloseWrite()
  437. l.Debugf("Wrote %d bytes", len(responseBuf))
  438. // Reading from the stream will block until the edge closes its end of the stream.
  439. // Otherwise, we'll close the whole connection before receiving the 'stream closed'
  440. // message from the edge.
  441. // Graceful shutdown works if you omit this, it just gives spurious errors for now -
  442. // TODO ignore errors when writing 'stream closed' and we're shutting down.
  443. stream.Read([]byte{0})
  444. l.Debugf("Handler ends")
  445. return nil
  446. })
  447. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  448. muxPair.Serve(t)
  449. stream, err := muxPair.OpenEdgeMuxStream(
  450. []Header{{Name: "test-header", Value: "headerValue"}},
  451. nil,
  452. )
  453. if err != nil {
  454. t.Fatalf("error in OpenStream: %s", err)
  455. }
  456. // Start graceful shutdown of the edge mux - this should also close the origin mux when done
  457. muxPair.EdgeMux.Shutdown()
  458. close(sendC)
  459. responseBody := make([]byte, len(responseBuf))
  460. l.Debugf("Waiting for %d bytes", len(responseBuf))
  461. n, err := io.ReadFull(stream, responseBody)
  462. if err != nil {
  463. t.Fatalf("error from (*MuxedStream).Read with %d bytes read: %s", n, err)
  464. }
  465. if n != len(responseBody) {
  466. t.Fatalf("expected response body to have %d bytes, got %d", len(responseBody), n)
  467. }
  468. if !bytes.Equal(responseBuf, responseBody) {
  469. t.Fatalf("response body mismatch")
  470. }
  471. stream.Close()
  472. muxPair.Wait(t)
  473. }
  474. func TestUnexpectedShutdown(t *testing.T) {
  475. sendC := make(chan struct{})
  476. handlerFinishC := make(chan struct{})
  477. responseBuf := bytes.Repeat([]byte("Hello world"), 65536)
  478. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  479. defer close(handlerFinishC)
  480. stream.WriteHeaders([]Header{
  481. {Name: "response-header", Value: "responseValue"},
  482. })
  483. <-sendC
  484. n, err := stream.Read([]byte{0})
  485. if err != io.EOF {
  486. t.Fatalf("unexpected error from (*MuxedStream).Read: %s", err)
  487. }
  488. if n != 0 {
  489. t.Fatalf("expected empty read, got %d bytes", n)
  490. }
  491. // Write comes after read, because write buffers data before it is flushed. It wouldn't know about EOF
  492. // until some time later. Calling read first forces it to know about EOF now.
  493. _, err = stream.Write(responseBuf)
  494. if err != io.EOF {
  495. t.Fatalf("unexpected error from (*MuxedStream).Write: %s", err)
  496. }
  497. return nil
  498. })
  499. muxPair := NewDefaultMuxerPair(t, t.Name(), f)
  500. muxPair.Serve(t)
  501. stream, err := muxPair.OpenEdgeMuxStream(
  502. []Header{{Name: "test-header", Value: "headerValue"}},
  503. nil,
  504. )
  505. // Close the underlying connection before telling the origin to write.
  506. muxPair.EdgeConn.Close()
  507. close(sendC)
  508. if err != nil {
  509. t.Fatalf("error in OpenStream: %s", err)
  510. }
  511. responseBody := make([]byte, len(responseBuf))
  512. n, err := io.ReadFull(stream, responseBody)
  513. if err != io.EOF {
  514. t.Fatalf("unexpected error from (*MuxedStream).Read: %s", err)
  515. }
  516. if n != 0 {
  517. t.Fatalf("expected response body to have %d bytes, got %d", 0, n)
  518. }
  519. // The write ordering requirement explained in the origin handler applies here too.
  520. _, err = stream.Write(responseBuf)
  521. if err != io.EOF {
  522. t.Fatalf("unexpected error from (*MuxedStream).Write: %s", err)
  523. }
  524. <-handlerFinishC
  525. }
  526. func EchoHandler(stream *MuxedStream) error {
  527. var buf bytes.Buffer
  528. fmt.Fprintf(&buf, "Hello, world!\n\n# REQUEST HEADERS:\n\n")
  529. for _, header := range stream.Headers {
  530. fmt.Fprintf(&buf, "[%s] = %s\n", header.Name, header.Value)
  531. }
  532. stream.WriteHeaders([]Header{
  533. {Name: ":status", Value: "200"},
  534. {Name: "server", Value: "Echo-server/1.0"},
  535. {Name: "date", Value: time.Now().Format(time.RFC850)},
  536. {Name: "content-type", Value: "text/html; charset=utf-8"},
  537. {Name: "content-length", Value: strconv.Itoa(buf.Len())},
  538. })
  539. buf.WriteTo(stream)
  540. return nil
  541. }
  542. func TestOpenAfterDisconnect(t *testing.T) {
  543. for i := 0; i < 3; i++ {
  544. muxPair := NewDefaultMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), i), EchoHandler)
  545. muxPair.Serve(t)
  546. switch i {
  547. case 0:
  548. // Close both directions of the connection to cause EOF on both peers.
  549. muxPair.OriginConn.Close()
  550. muxPair.EdgeConn.Close()
  551. case 1:
  552. // Close origin conn to cause EOF on origin first.
  553. muxPair.OriginConn.Close()
  554. case 2:
  555. // Close edge conn to cause EOF on edge first.
  556. muxPair.EdgeConn.Close()
  557. }
  558. _, err := muxPair.OpenEdgeMuxStream(
  559. []Header{{Name: "test-header", Value: "headerValue"}},
  560. nil,
  561. )
  562. if err != ErrStreamRequestConnectionClosed && err != ErrResponseHeadersConnectionClosed {
  563. t.Fatalf("case %v: unexpected error in OpenStream: %v", i, err)
  564. }
  565. }
  566. }
  567. func TestHPACK(t *testing.T) {
  568. muxPair := NewDefaultMuxerPair(t, t.Name(), EchoHandler)
  569. muxPair.Serve(t)
  570. stream, err := muxPair.OpenEdgeMuxStream(
  571. []Header{
  572. {Name: ":method", Value: "RPC"},
  573. {Name: ":scheme", Value: "capnp"},
  574. {Name: ":path", Value: "*"},
  575. },
  576. nil,
  577. )
  578. if err != nil {
  579. t.Fatalf("error in OpenStream: %s", err)
  580. }
  581. stream.Close()
  582. for i := 0; i < 3; i++ {
  583. stream, err := muxPair.OpenEdgeMuxStream(
  584. []Header{
  585. {Name: ":method", Value: "GET"},
  586. {Name: ":scheme", Value: "https"},
  587. {Name: ":authority", Value: "tunnel.otterlyadorable.co.uk"},
  588. {Name: ":path", Value: "/get"},
  589. {Name: "accept-encoding", Value: "gzip"},
  590. {Name: "cf-ray", Value: "378948953f044408-SFO-DOG"},
  591. {Name: "cf-visitor", Value: "{\"scheme\":\"https\"}"},
  592. {Name: "cf-connecting-ip", Value: "2400:cb00:0025:010d:0000:0000:0000:0001"},
  593. {Name: "x-forwarded-for", Value: "2400:cb00:0025:010d:0000:0000:0000:0001"},
  594. {Name: "x-forwarded-proto", Value: "https"},
  595. {Name: "accept-language", Value: "en-gb"},
  596. {Name: "referer", Value: "https://tunnel.otterlyadorable.co.uk/"},
  597. {Name: "cookie", Value: "__cfduid=d4555095065f92daedc059490771967d81493032162"},
  598. {Name: "connection", Value: "Keep-Alive"},
  599. {Name: "cf-ipcountry", Value: "US"},
  600. {Name: "accept", Value: "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"},
  601. {Name: "user-agent", Value: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/603.2.4 (KHTML, like Gecko) Version/10.1.1 Safari/603.2.4"},
  602. },
  603. nil,
  604. )
  605. if err != nil {
  606. t.Fatalf("error in OpenStream: %s", err)
  607. }
  608. if len(stream.Headers) == 0 {
  609. t.Fatal("response has no headers")
  610. }
  611. if stream.Headers[0].Name != ":status" {
  612. t.Fatalf("first header should be status, found %s instead", stream.Headers[0].Name)
  613. }
  614. if stream.Headers[0].Value != "200" {
  615. t.Fatalf("expected status 200, got %s", stream.Headers[0].Value)
  616. }
  617. ioutil.ReadAll(stream)
  618. stream.Close()
  619. }
  620. }
  621. func AssertIfPipeReadable(t *testing.T, pipe io.ReadCloser) {
  622. errC := make(chan error)
  623. go func() {
  624. b := []byte{0}
  625. n, err := pipe.Read(b)
  626. if n > 0 {
  627. t.Fatalf("read pipe was not empty")
  628. }
  629. errC <- err
  630. }()
  631. select {
  632. case err := <-errC:
  633. if err != nil {
  634. t.Fatalf("read error: %s", err)
  635. }
  636. case <-time.After(100 * time.Millisecond):
  637. // nothing to read
  638. }
  639. }
  640. func TestMultipleStreamsWithDictionaries(t *testing.T) {
  641. l := logger.NewOutputWriter(logger.NewMockWriteManager())
  642. for q := CompressionNone; q <= CompressionMax; q++ {
  643. htmlBody := `<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"` +
  644. `"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">` +
  645. `<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">` +
  646. `<head>` +
  647. ` <title>Your page title here</title>` +
  648. `</head>` +
  649. `<body>` +
  650. `<h1>Your major heading here</h1>` +
  651. `<p>` +
  652. `This is a regular text paragraph.` +
  653. `</p>` +
  654. `<ul>` +
  655. ` <li>` +
  656. ` First bullet of a bullet list.` +
  657. ` </li>` +
  658. ` <li>` +
  659. ` This is the <em>second</em> bullet.` +
  660. ` </li>` +
  661. `</ul>` +
  662. `</body>` +
  663. `</html>`
  664. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  665. var contentType string
  666. var pathHeader Header
  667. for _, h := range stream.Headers {
  668. if h.Name == ":path" {
  669. pathHeader = h
  670. break
  671. }
  672. }
  673. if pathHeader.Name != ":path" {
  674. panic("Couldn't find :path header in test")
  675. }
  676. if strings.Contains(pathHeader.Value, "html") {
  677. contentType = "text/html; charset=utf-8"
  678. } else if strings.Contains(pathHeader.Value, "js") {
  679. contentType = "application/javascript"
  680. } else if strings.Contains(pathHeader.Value, "css") {
  681. contentType = "text/css"
  682. } else {
  683. contentType = "img/gif"
  684. }
  685. stream.WriteHeaders([]Header{
  686. Header{Name: "content-type", Value: contentType},
  687. })
  688. stream.Write([]byte(strings.Replace(htmlBody, "paragraph", pathHeader.Value, 1) + stream.Headers[5].Value))
  689. return nil
  690. })
  691. muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, f)
  692. muxPair.Serve(t)
  693. var wg sync.WaitGroup
  694. paths := []string{
  695. "/html1",
  696. "/html2?sa:ds",
  697. "/html3",
  698. "/css1",
  699. "/html1",
  700. "/html2?sa:ds",
  701. "/html3",
  702. "/css1",
  703. "/css2",
  704. "/css3",
  705. "/js",
  706. "/js",
  707. "/js",
  708. "/js2",
  709. "/img2",
  710. "/html1",
  711. "/html2?sa:ds",
  712. "/html3",
  713. "/css1",
  714. "/css2",
  715. "/css3",
  716. "/js",
  717. "/js",
  718. "/js",
  719. "/js2",
  720. "/img1",
  721. }
  722. wg.Add(len(paths))
  723. errorsC := make(chan error, len(paths))
  724. for i, s := range paths {
  725. go func(index int, path string) {
  726. defer wg.Done()
  727. stream, err := muxPair.OpenEdgeMuxStream(
  728. []Header{
  729. {Name: ":method", Value: "GET"},
  730. {Name: ":scheme", Value: "https"},
  731. {Name: ":authority", Value: "tunnel.otterlyadorable.co.uk"},
  732. {Name: ":path", Value: path},
  733. {Name: "cf-ray", Value: "378948953f044408-SFO-DOG"},
  734. {Name: "idx", Value: strconv.Itoa(index)},
  735. {Name: "accept-encoding", Value: "gzip, br"},
  736. },
  737. nil,
  738. )
  739. if err != nil {
  740. errorsC <- fmt.Errorf("error in OpenStream: %v", err)
  741. return
  742. }
  743. expectBody := strings.Replace(htmlBody, "paragraph", path, 1) + strconv.Itoa(index)
  744. responseBody := make([]byte, len(expectBody)*2)
  745. n, err := stream.Read(responseBody)
  746. if err != nil {
  747. errorsC <- fmt.Errorf("stream %d error from (*MuxedStream).Read: %s", stream.streamID, err)
  748. return
  749. }
  750. if n != len(expectBody) {
  751. errorsC <- fmt.Errorf("stream %d expected response body to have %d bytes, got %d", stream.streamID, len(expectBody), n)
  752. return
  753. }
  754. if string(responseBody[:n]) != expectBody {
  755. errorsC <- fmt.Errorf("stream %d expected response body %s, got %s", stream.streamID, expectBody, responseBody[:n])
  756. return
  757. }
  758. }(i, s)
  759. }
  760. wg.Wait()
  761. close(errorsC)
  762. testFail := false
  763. for err := range errorsC {
  764. testFail = true
  765. l.Errorf("%s", err)
  766. }
  767. if testFail {
  768. t.Fatalf("TestMultipleStreams failed")
  769. }
  770. originMuxMetrics := muxPair.OriginMux.Metrics()
  771. if q > CompressionNone && originMuxMetrics.CompBytesBefore.Value() <= 10*originMuxMetrics.CompBytesAfter.Value() {
  772. t.Fatalf("Cross-stream compression is expected to give a better compression ratio")
  773. }
  774. }
  775. }
  776. func sampleSiteHandler(files map[string][]byte) MuxedStreamFunc {
  777. l := logger.NewOutputWriter(logger.NewMockWriteManager())
  778. return func(stream *MuxedStream) error {
  779. var contentType string
  780. var pathHeader Header
  781. for _, h := range stream.Headers {
  782. if h.Name == ":path" {
  783. pathHeader = h
  784. break
  785. }
  786. }
  787. if pathHeader.Name != ":path" {
  788. return fmt.Errorf("Couldn't find :path header in test")
  789. }
  790. if strings.Contains(pathHeader.Value, "html") {
  791. contentType = "text/html; charset=utf-8"
  792. } else if strings.Contains(pathHeader.Value, "js") {
  793. contentType = "application/javascript"
  794. } else if strings.Contains(pathHeader.Value, "css") {
  795. contentType = "text/css"
  796. } else {
  797. contentType = "img/gif"
  798. }
  799. stream.WriteHeaders([]Header{
  800. Header{Name: "content-type", Value: contentType},
  801. })
  802. l.Debugf("Wrote headers for stream %s", pathHeader.Value)
  803. file, ok := files[pathHeader.Value]
  804. if !ok {
  805. return fmt.Errorf("%s content is not preloaded", pathHeader.Value)
  806. }
  807. stream.Write(file)
  808. l.Debugf("Wrote body for stream %s", pathHeader.Value)
  809. return nil
  810. }
  811. }
  812. func sampleSiteTest(muxPair *DefaultMuxerPair, path string, files map[string][]byte) error {
  813. stream, err := muxPair.OpenEdgeMuxStream(
  814. []Header{
  815. {Name: ":method", Value: "GET"},
  816. {Name: ":scheme", Value: "https"},
  817. {Name: ":authority", Value: "tunnel.otterlyadorable.co.uk"},
  818. {Name: ":path", Value: path},
  819. {Name: "accept-encoding", Value: "br, gzip"},
  820. {Name: "cf-ray", Value: "378948953f044408-SFO-DOG"},
  821. },
  822. nil,
  823. )
  824. if err != nil {
  825. return fmt.Errorf("error in OpenStream: %v", err)
  826. }
  827. file, ok := files[path]
  828. if !ok {
  829. return fmt.Errorf("%s content is not preloaded", path)
  830. }
  831. responseBody := make([]byte, len(file))
  832. n, err := io.ReadFull(stream, responseBody)
  833. if err != nil {
  834. return fmt.Errorf("error from (*MuxedStream).Read: %v", err)
  835. }
  836. if n != len(file) {
  837. return fmt.Errorf("expected response body to have %d bytes, got %d", len(file), n)
  838. }
  839. if string(responseBody[:n]) != string(file) {
  840. return fmt.Errorf("expected response body %s, got %s", file, responseBody[:n])
  841. }
  842. return nil
  843. }
  844. func loadSampleFiles(paths []string) (map[string][]byte, error) {
  845. files := make(map[string][]byte)
  846. for _, path := range paths {
  847. if _, ok := files[path]; !ok {
  848. expectBody, err := ioutil.ReadFile(path)
  849. if err != nil {
  850. return nil, err
  851. }
  852. files[path] = expectBody
  853. }
  854. }
  855. return files, nil
  856. }
  857. func TestSampleSiteWithDictionaries(t *testing.T) {
  858. paths := []string{
  859. "./sample/index.html",
  860. "./sample/index2.html",
  861. "./sample/index1.html",
  862. "./sample/ghost-url.min.js",
  863. "./sample/jquery.fitvids.js",
  864. "./sample/index1.html",
  865. "./sample/index2.html",
  866. "./sample/index.html",
  867. }
  868. files, err := loadSampleFiles(paths)
  869. assert.NoError(t, err)
  870. for q := CompressionNone; q <= CompressionMax; q++ {
  871. muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, sampleSiteHandler(files))
  872. muxPair.Serve(t)
  873. var wg sync.WaitGroup
  874. errC := make(chan error, len(paths))
  875. wg.Add(len(paths))
  876. for _, s := range paths {
  877. go func(path string) {
  878. defer wg.Done()
  879. errC <- sampleSiteTest(muxPair, path, files)
  880. }(s)
  881. }
  882. wg.Wait()
  883. close(errC)
  884. for err := range errC {
  885. assert.NoError(t, err)
  886. }
  887. originMuxMetrics := muxPair.OriginMux.Metrics()
  888. if q > CompressionNone && originMuxMetrics.CompBytesBefore.Value() <= 10*originMuxMetrics.CompBytesAfter.Value() {
  889. t.Fatalf("Cross-stream compression is expected to give a better compression ratio")
  890. }
  891. }
  892. }
  893. func TestLongSiteWithDictionaries(t *testing.T) {
  894. paths := []string{
  895. "./sample/index.html",
  896. "./sample/index1.html",
  897. "./sample/index2.html",
  898. "./sample/ghost-url.min.js",
  899. "./sample/jquery.fitvids.js",
  900. }
  901. files, err := loadSampleFiles(paths)
  902. assert.NoError(t, err)
  903. for q := CompressionNone; q <= CompressionMedium; q++ {
  904. muxPair := NewCompressedMuxerPair(t, fmt.Sprintf("%s_%d", t.Name(), q), q, sampleSiteHandler(files))
  905. muxPair.Serve(t)
  906. rand.Seed(time.Now().Unix())
  907. tstLen := 500
  908. errGroup, _ := errgroup.WithContext(context.Background())
  909. for i := 0; i < tstLen; i++ {
  910. errGroup.Go(func() error {
  911. path := paths[rand.Int()%len(paths)]
  912. return sampleSiteTest(muxPair, path, files)
  913. })
  914. }
  915. assert.NoError(t, errGroup.Wait())
  916. originMuxMetrics := muxPair.OriginMux.Metrics()
  917. if q > CompressionNone && originMuxMetrics.CompBytesBefore.Value() <= 10*originMuxMetrics.CompBytesAfter.Value() {
  918. t.Fatalf("Cross-stream compression is expected to give a better compression ratio")
  919. }
  920. }
  921. }
  922. func BenchmarkOpenStream(b *testing.B) {
  923. const streams = 5000
  924. for i := 0; i < b.N; i++ {
  925. b.StopTimer()
  926. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  927. if len(stream.Headers) != 1 {
  928. b.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  929. }
  930. if stream.Headers[0].Name != "test-header" {
  931. b.Fatalf("expected header name %s, got %s", "test-header", stream.Headers[0].Name)
  932. }
  933. if stream.Headers[0].Value != "headerValue" {
  934. b.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
  935. }
  936. stream.WriteHeaders([]Header{
  937. {Name: "response-header", Value: "responseValue"},
  938. })
  939. return nil
  940. })
  941. muxPair := NewDefaultMuxerPair(b, fmt.Sprintf("%s_%d", b.Name(), i), f)
  942. muxPair.Serve(b)
  943. b.StartTimer()
  944. openStreams(b, muxPair, streams)
  945. }
  946. }
  947. func openStreams(b *testing.B, muxPair *DefaultMuxerPair, n int) {
  948. errGroup, _ := errgroup.WithContext(context.Background())
  949. for i := 0; i < n; i++ {
  950. errGroup.Go(func() error {
  951. _, err := muxPair.OpenEdgeMuxStream(
  952. []Header{{Name: "test-header", Value: "headerValue"}},
  953. nil,
  954. )
  955. return err
  956. })
  957. }
  958. assert.NoError(b, errGroup.Wait())
  959. }
  960. func BenchmarkSingleStreamLargeResponseBody(b *testing.B) {
  961. const bodySize = 1 << 24
  962. const writeBufferSize = 16 << 10
  963. const writeN = bodySize / writeBufferSize
  964. payload := make([]byte, writeBufferSize)
  965. for i := range payload {
  966. payload[i] = byte(i % 256)
  967. }
  968. const readBufferSize = 16 << 10
  969. const readN = bodySize / readBufferSize
  970. responseBody := make([]byte, readBufferSize)
  971. f := MuxedStreamFunc(func(stream *MuxedStream) error {
  972. if len(stream.Headers) != 1 {
  973. b.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  974. }
  975. if stream.Headers[0].Name != "test-header" {
  976. b.Fatalf("expected header name %s, got %s", "test-header", stream.Headers[0].Name)
  977. }
  978. if stream.Headers[0].Value != "headerValue" {
  979. b.Fatalf("expected header value %s, got %s", "headerValue", stream.Headers[0].Value)
  980. }
  981. stream.WriteHeaders([]Header{
  982. {Name: "response-header", Value: "responseValue"},
  983. })
  984. for i := 0; i < writeN; i++ {
  985. n, err := stream.Write(payload)
  986. if err != nil {
  987. b.Fatalf("origin write error: %s", err)
  988. }
  989. if n != len(payload) {
  990. b.Fatalf("origin short write: %d/%d bytes", n, len(payload))
  991. }
  992. }
  993. return nil
  994. })
  995. name := fmt.Sprintf("%s_%d", b.Name(), rand.Int())
  996. origin, edge := net.Pipe()
  997. muxPair := &DefaultMuxerPair{
  998. OriginMuxConfig: MuxerConfig{
  999. Timeout: testHandshakeTimeout,
  1000. Handler: f,
  1001. IsClient: true,
  1002. Name: "origin",
  1003. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  1004. DefaultWindowSize: defaultWindowSize,
  1005. MaxWindowSize: maxWindowSize,
  1006. StreamWriteBufferMaxLen: defaultWriteBufferMaxLen,
  1007. HeartbeatInterval: defaultTimeout,
  1008. MaxHeartbeats: defaultRetries,
  1009. },
  1010. OriginConn: origin,
  1011. EdgeMuxConfig: MuxerConfig{
  1012. Timeout: testHandshakeTimeout,
  1013. IsClient: false,
  1014. Name: "edge",
  1015. Logger: logger.NewOutputWriter(logger.NewMockWriteManager()),
  1016. DefaultWindowSize: defaultWindowSize,
  1017. MaxWindowSize: maxWindowSize,
  1018. StreamWriteBufferMaxLen: defaultWriteBufferMaxLen,
  1019. HeartbeatInterval: defaultTimeout,
  1020. MaxHeartbeats: defaultRetries,
  1021. },
  1022. EdgeConn: edge,
  1023. doneC: make(chan struct{}),
  1024. }
  1025. assert.NoError(b, muxPair.Handshake(name))
  1026. muxPair.Serve(b)
  1027. b.ReportAllocs()
  1028. for i := 0; i < b.N; i++ {
  1029. stream, err := muxPair.OpenEdgeMuxStream(
  1030. []Header{{Name: "test-header", Value: "headerValue"}},
  1031. nil,
  1032. )
  1033. if err != nil {
  1034. b.Fatalf("error in OpenStream: %s", err)
  1035. }
  1036. if len(stream.Headers) != 1 {
  1037. b.Fatalf("expected %d headers, got %d", 1, len(stream.Headers))
  1038. }
  1039. if stream.Headers[0].Name != "response-header" {
  1040. b.Fatalf("expected header name %s, got %s", "response-header", stream.Headers[0].Name)
  1041. }
  1042. if stream.Headers[0].Value != "responseValue" {
  1043. b.Fatalf("expected header value %s, got %s", "responseValue", stream.Headers[0].Value)
  1044. }
  1045. for k := 0; k < readN; k++ {
  1046. n, err := io.ReadFull(stream, responseBody)
  1047. if err != nil {
  1048. b.Fatalf("error from (*MuxedStream).Read: %s", err)
  1049. }
  1050. if n != len(responseBody) {
  1051. b.Fatalf("expected response body to have %d bytes, got %d", len(responseBody), n)
  1052. }
  1053. }
  1054. }
  1055. }