session_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package management
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/assert"
  7. "github.com/stretchr/testify/require"
  8. )
  9. // Validate the active states of the session
  10. func TestSession_ActiveControl(t *testing.T) {
  11. _, cancel := context.WithCancel(context.Background())
  12. defer cancel()
  13. session := newSession(4, actor{}, cancel)
  14. // session starts out not active
  15. assert.False(t, session.Active())
  16. session.active.Store(true)
  17. assert.True(t, session.Active())
  18. session.Stop()
  19. assert.False(t, session.Active())
  20. }
  21. // Validate that the session filters events
  22. func TestSession_Insert(t *testing.T) {
  23. _, cancel := context.WithCancel(context.Background())
  24. defer cancel()
  25. infoLevel := new(LogLevel)
  26. *infoLevel = Info
  27. warnLevel := new(LogLevel)
  28. *warnLevel = Warn
  29. for _, test := range []struct {
  30. name string
  31. filters StreamingFilters
  32. expectLog bool
  33. }{
  34. {
  35. name: "none",
  36. expectLog: true,
  37. },
  38. {
  39. name: "level",
  40. filters: StreamingFilters{
  41. Level: infoLevel,
  42. },
  43. expectLog: true,
  44. },
  45. {
  46. name: "filtered out level",
  47. filters: StreamingFilters{
  48. Level: warnLevel,
  49. },
  50. expectLog: false,
  51. },
  52. {
  53. name: "events",
  54. filters: StreamingFilters{
  55. Events: []LogEventType{HTTP},
  56. },
  57. expectLog: true,
  58. },
  59. {
  60. name: "filtered out event",
  61. filters: StreamingFilters{
  62. Events: []LogEventType{Cloudflared},
  63. },
  64. expectLog: false,
  65. },
  66. {
  67. name: "sampling",
  68. filters: StreamingFilters{
  69. Sampling: 0.9999999,
  70. },
  71. expectLog: true,
  72. },
  73. {
  74. name: "sampling (invalid negative)",
  75. filters: StreamingFilters{
  76. Sampling: -1.0,
  77. },
  78. expectLog: true,
  79. },
  80. {
  81. name: "sampling (invalid too large)",
  82. filters: StreamingFilters{
  83. Sampling: 2.0,
  84. },
  85. expectLog: true,
  86. },
  87. {
  88. name: "filter and event",
  89. filters: StreamingFilters{
  90. Level: infoLevel,
  91. Events: []LogEventType{HTTP},
  92. },
  93. expectLog: true,
  94. },
  95. } {
  96. t.Run(test.name, func(t *testing.T) {
  97. session := newSession(4, actor{}, cancel)
  98. session.Filters(&test.filters)
  99. log := Log{
  100. Time: time.Now().UTC().Format(time.RFC3339),
  101. Event: HTTP,
  102. Level: Info,
  103. Message: "test",
  104. }
  105. session.Insert(&log)
  106. select {
  107. case <-session.listener:
  108. require.True(t, test.expectLog)
  109. default:
  110. require.False(t, test.expectLog)
  111. }
  112. })
  113. }
  114. }
  115. // Validate that the session has a max amount of events to hold
  116. func TestSession_InsertOverflow(t *testing.T) {
  117. _, cancel := context.WithCancel(context.Background())
  118. defer cancel()
  119. session := newSession(1, actor{}, cancel)
  120. log := Log{
  121. Time: time.Now().UTC().Format(time.RFC3339),
  122. Event: HTTP,
  123. Level: Info,
  124. Message: "test",
  125. }
  126. // Insert 2 but only max channel size for 1
  127. session.Insert(&log)
  128. session.Insert(&log)
  129. select {
  130. case <-session.listener:
  131. // pass
  132. default:
  133. require.Fail(t, "expected one log event")
  134. }
  135. // Second dequeue should fail
  136. select {
  137. case <-session.listener:
  138. require.Fail(t, "expected no more remaining log events")
  139. default:
  140. // pass
  141. }
  142. }