events_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package management
  2. import (
  3. "context"
  4. "testing"
  5. "time"
  6. "github.com/stretchr/testify/require"
  7. "nhooyr.io/websocket"
  8. "github.com/cloudflare/cloudflared/internal/test"
  9. )
  10. var (
  11. debugLevel *LogLevel
  12. infoLevel *LogLevel
  13. warnLevel *LogLevel
  14. errorLevel *LogLevel
  15. )
  16. func init() {
  17. // created here because we can't do a reference to a const enum, i.e. &Info
  18. debugLevel := new(LogLevel)
  19. *debugLevel = Debug
  20. infoLevel := new(LogLevel)
  21. *infoLevel = Info
  22. warnLevel := new(LogLevel)
  23. *warnLevel = Warn
  24. errorLevel := new(LogLevel)
  25. *errorLevel = Error
  26. }
  27. func TestIntoClientEvent_StartStreaming(t *testing.T) {
  28. for _, test := range []struct {
  29. name string
  30. expected EventStartStreaming
  31. }{
  32. {
  33. name: "no filters",
  34. expected: EventStartStreaming{ClientEvent: ClientEvent{Type: StartStreaming}},
  35. },
  36. {
  37. name: "level filter",
  38. expected: EventStartStreaming{
  39. ClientEvent: ClientEvent{Type: StartStreaming},
  40. Filters: &StreamingFilters{
  41. Level: infoLevel,
  42. },
  43. },
  44. },
  45. {
  46. name: "events filter",
  47. expected: EventStartStreaming{
  48. ClientEvent: ClientEvent{Type: StartStreaming},
  49. Filters: &StreamingFilters{
  50. Events: []LogEventType{Cloudflared, HTTP},
  51. },
  52. },
  53. },
  54. {
  55. name: "sampling filter",
  56. expected: EventStartStreaming{
  57. ClientEvent: ClientEvent{Type: StartStreaming},
  58. Filters: &StreamingFilters{
  59. Sampling: 0.5,
  60. },
  61. },
  62. },
  63. {
  64. name: "level and events filters",
  65. expected: EventStartStreaming{
  66. ClientEvent: ClientEvent{Type: StartStreaming},
  67. Filters: &StreamingFilters{
  68. Level: infoLevel,
  69. Events: []LogEventType{Cloudflared},
  70. Sampling: 0.5,
  71. },
  72. },
  73. },
  74. } {
  75. t.Run(test.name, func(t *testing.T) {
  76. data, err := json.Marshal(test.expected)
  77. require.NoError(t, err)
  78. event := ClientEvent{}
  79. err = json.Unmarshal(data, &event)
  80. require.NoError(t, err)
  81. event.event = data
  82. ce, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
  83. require.True(t, ok)
  84. require.Equal(t, test.expected.ClientEvent, ce.ClientEvent)
  85. if test.expected.Filters != nil {
  86. f := ce.Filters
  87. ef := test.expected.Filters
  88. if ef.Level != nil {
  89. require.Equal(t, *ef.Level, *f.Level)
  90. }
  91. require.ElementsMatch(t, ef.Events, f.Events)
  92. }
  93. })
  94. }
  95. }
  96. func TestIntoClientEvent_StopStreaming(t *testing.T) {
  97. event := ClientEvent{
  98. Type: StopStreaming,
  99. event: []byte(`{"type": "stop_streaming"}`),
  100. }
  101. ce, ok := IntoClientEvent[EventStopStreaming](&event, StopStreaming)
  102. require.True(t, ok)
  103. require.Equal(t, EventStopStreaming{ClientEvent: ClientEvent{Type: StopStreaming}}, *ce)
  104. }
  105. func TestIntoClientEvent_Invalid(t *testing.T) {
  106. event := ClientEvent{
  107. Type: UnknownClientEventType,
  108. event: []byte(`{"type": "invalid"}`),
  109. }
  110. _, ok := IntoClientEvent[EventStartStreaming](&event, StartStreaming)
  111. require.False(t, ok)
  112. }
  113. func TestIntoServerEvent_Logs(t *testing.T) {
  114. event := ServerEvent{
  115. Type: Logs,
  116. event: []byte(`{"type": "logs"}`),
  117. }
  118. ce, ok := IntoServerEvent(&event, Logs)
  119. require.True(t, ok)
  120. require.Equal(t, EventLog{ServerEvent: ServerEvent{Type: Logs}}, *ce)
  121. }
  122. func TestIntoServerEvent_Invalid(t *testing.T) {
  123. event := ServerEvent{
  124. Type: UnknownServerEventType,
  125. event: []byte(`{"type": "invalid"}`),
  126. }
  127. _, ok := IntoServerEvent(&event, Logs)
  128. require.False(t, ok)
  129. }
  130. func TestReadServerEvent(t *testing.T) {
  131. sentEvent := EventLog{
  132. ServerEvent: ServerEvent{Type: Logs},
  133. Logs: []*Log{
  134. {
  135. Time: time.Now().UTC().Format(time.RFC3339),
  136. Event: HTTP,
  137. Level: Info,
  138. Message: "test",
  139. },
  140. },
  141. }
  142. client, server := test.WSPipe(nil, nil)
  143. server.CloseRead(context.Background())
  144. defer func() {
  145. server.Close(websocket.StatusInternalError, "")
  146. }()
  147. go func() {
  148. err := WriteEvent(server, context.Background(), &sentEvent)
  149. require.NoError(t, err)
  150. }()
  151. event, err := ReadServerEvent(client, context.Background())
  152. require.NoError(t, err)
  153. require.Equal(t, sentEvent.Type, event.Type)
  154. client.Close(websocket.StatusInternalError, "")
  155. }
  156. func TestReadServerEvent_InvalidWebSocketMessageType(t *testing.T) {
  157. client, server := test.WSPipe(nil, nil)
  158. server.CloseRead(context.Background())
  159. defer func() {
  160. server.Close(websocket.StatusInternalError, "")
  161. }()
  162. go func() {
  163. err := server.Write(context.Background(), websocket.MessageBinary, []byte("test1234"))
  164. require.NoError(t, err)
  165. }()
  166. _, err := ReadServerEvent(client, context.Background())
  167. require.Error(t, err)
  168. client.Close(websocket.StatusInternalError, "")
  169. }
  170. func TestReadServerEvent_InvalidMessageType(t *testing.T) {
  171. sentEvent := ClientEvent{Type: ClientEventType(UnknownServerEventType)}
  172. client, server := test.WSPipe(nil, nil)
  173. server.CloseRead(context.Background())
  174. defer func() {
  175. server.Close(websocket.StatusInternalError, "")
  176. }()
  177. go func() {
  178. err := WriteEvent(server, context.Background(), &sentEvent)
  179. require.NoError(t, err)
  180. }()
  181. _, err := ReadServerEvent(client, context.Background())
  182. require.ErrorIs(t, err, errInvalidMessageType)
  183. client.Close(websocket.StatusInternalError, "")
  184. }
  185. func TestReadClientEvent(t *testing.T) {
  186. sentEvent := EventStartStreaming{
  187. ClientEvent: ClientEvent{Type: StartStreaming},
  188. }
  189. client, server := test.WSPipe(nil, nil)
  190. client.CloseRead(context.Background())
  191. defer func() {
  192. client.Close(websocket.StatusInternalError, "")
  193. }()
  194. go func() {
  195. err := WriteEvent(client, context.Background(), &sentEvent)
  196. require.NoError(t, err)
  197. }()
  198. event, err := ReadClientEvent(server, context.Background())
  199. require.NoError(t, err)
  200. require.Equal(t, sentEvent.Type, event.Type)
  201. server.Close(websocket.StatusInternalError, "")
  202. }
  203. func TestReadClientEvent_InvalidWebSocketMessageType(t *testing.T) {
  204. client, server := test.WSPipe(nil, nil)
  205. client.CloseRead(context.Background())
  206. defer func() {
  207. client.Close(websocket.StatusInternalError, "")
  208. }()
  209. go func() {
  210. err := client.Write(context.Background(), websocket.MessageBinary, []byte("test1234"))
  211. require.NoError(t, err)
  212. }()
  213. _, err := ReadClientEvent(server, context.Background())
  214. require.Error(t, err)
  215. server.Close(websocket.StatusInternalError, "")
  216. }
  217. func TestReadClientEvent_InvalidMessageType(t *testing.T) {
  218. sentEvent := ClientEvent{Type: UnknownClientEventType}
  219. client, server := test.WSPipe(nil, nil)
  220. client.CloseRead(context.Background())
  221. defer func() {
  222. client.Close(websocket.StatusInternalError, "")
  223. }()
  224. go func() {
  225. err := WriteEvent(client, context.Background(), &sentEvent)
  226. require.NoError(t, err)
  227. }()
  228. _, err := ReadClientEvent(server, context.Background())
  229. require.ErrorIs(t, err, errInvalidMessageType)
  230. server.Close(websocket.StatusInternalError, "")
  231. }