service_test.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package management
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "net/http/httptest"
  7. "strings"
  8. "testing"
  9. "time"
  10. "github.com/google/uuid"
  11. "github.com/rs/zerolog"
  12. "github.com/stretchr/testify/assert"
  13. "github.com/stretchr/testify/require"
  14. "nhooyr.io/websocket"
  15. "github.com/cloudflare/cloudflared/internal/test"
  16. )
  17. var (
  18. noopLogger = zerolog.New(io.Discard)
  19. managementHostname = "https://management.argotunnel.com"
  20. )
  21. func TestDisableDiagnosticRoutes(t *testing.T) {
  22. mgmt := New("management.argotunnel.com", false, "1.1.1.1:80", uuid.Nil, "", &noopLogger, nil)
  23. for _, path := range []string{"/metrics", "/debug/pprof/goroutine", "/debug/pprof/heap"} {
  24. t.Run(strings.Replace(path, "/", "_", -1), func(t *testing.T) {
  25. req := httptest.NewRequest("GET", managementHostname+path+"?access_token="+validToken, nil)
  26. recorder := httptest.NewRecorder()
  27. mgmt.ServeHTTP(recorder, req)
  28. resp := recorder.Result()
  29. require.Equal(t, http.StatusNotFound, resp.StatusCode)
  30. })
  31. }
  32. }
  33. func TestReadEventsLoop(t *testing.T) {
  34. sentEvent := EventStartStreaming{
  35. ClientEvent: ClientEvent{Type: StartStreaming},
  36. }
  37. client, server := test.WSPipe(nil, nil)
  38. client.CloseRead(context.Background())
  39. defer func() {
  40. client.Close(websocket.StatusInternalError, "")
  41. }()
  42. go func() {
  43. err := WriteEvent(client, context.Background(), &sentEvent)
  44. require.NoError(t, err)
  45. }()
  46. m := ManagementService{
  47. log: &noopLogger,
  48. }
  49. events := make(chan *ClientEvent)
  50. go m.readEvents(server, context.Background(), events)
  51. event := <-events
  52. require.Equal(t, sentEvent.Type, event.Type)
  53. server.Close(websocket.StatusInternalError, "")
  54. }
  55. func TestReadEventsLoop_ContextCancelled(t *testing.T) {
  56. client, server := test.WSPipe(nil, nil)
  57. ctx, cancel := context.WithCancel(context.Background())
  58. client.CloseRead(ctx)
  59. defer func() {
  60. client.Close(websocket.StatusInternalError, "")
  61. }()
  62. m := ManagementService{
  63. log: &noopLogger,
  64. }
  65. events := make(chan *ClientEvent)
  66. go func() {
  67. time.Sleep(time.Second)
  68. cancel()
  69. }()
  70. // Want to make sure this function returns when context is cancelled
  71. m.readEvents(server, ctx, events)
  72. server.Close(websocket.StatusInternalError, "")
  73. }
  74. func TestCanStartStream_NoSessions(t *testing.T) {
  75. m := ManagementService{
  76. log: &noopLogger,
  77. logger: &Logger{
  78. Log: &noopLogger,
  79. },
  80. }
  81. _, cancel := context.WithCancel(context.Background())
  82. session := newSession(0, actor{}, cancel)
  83. assert.True(t, m.canStartStream(session))
  84. }
  85. func TestCanStartStream_ExistingSessionDifferentActor(t *testing.T) {
  86. m := ManagementService{
  87. log: &noopLogger,
  88. logger: &Logger{
  89. Log: &noopLogger,
  90. },
  91. }
  92. _, cancel := context.WithCancel(context.Background())
  93. session1 := newSession(0, actor{ID: "test"}, cancel)
  94. assert.True(t, m.canStartStream(session1))
  95. m.logger.Listen(session1)
  96. assert.True(t, session1.Active())
  97. // Try another session
  98. session2 := newSession(0, actor{ID: "test2"}, cancel)
  99. assert.Equal(t, 1, m.logger.ActiveSessions())
  100. assert.False(t, m.canStartStream(session2))
  101. // Close session1
  102. m.logger.Remove(session1)
  103. assert.True(t, session1.Active()) // Remove doesn't stop a session
  104. session1.Stop()
  105. assert.False(t, session1.Active())
  106. assert.Equal(t, 0, m.logger.ActiveSessions())
  107. // Try session2 again
  108. assert.True(t, m.canStartStream(session2))
  109. }
  110. func TestCanStartStream_ExistingSessionSameActor(t *testing.T) {
  111. m := ManagementService{
  112. log: &noopLogger,
  113. logger: &Logger{
  114. Log: &noopLogger,
  115. },
  116. }
  117. actor := actor{ID: "test"}
  118. _, cancel := context.WithCancel(context.Background())
  119. session1 := newSession(0, actor, cancel)
  120. assert.True(t, m.canStartStream(session1))
  121. m.logger.Listen(session1)
  122. assert.True(t, session1.Active())
  123. // Try another session
  124. session2 := newSession(0, actor, cancel)
  125. assert.Equal(t, 1, m.logger.ActiveSessions())
  126. assert.True(t, m.canStartStream(session2))
  127. // session1 is removed and stopped
  128. assert.Equal(t, 0, m.logger.ActiveSessions())
  129. assert.False(t, session1.Active())
  130. }