events.go 8.4 KB


  1. package management
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. jsoniter "github.com/json-iterator/go"
  8. "github.com/rs/zerolog"
  9. "nhooyr.io/websocket"
  10. )
  11. var (
  12. errInvalidMessageType = fmt.Errorf("invalid message type was provided")
  13. )
  14. // ServerEventType represents the event types that can come from the server
  15. type ServerEventType string
  16. // ClientEventType represents the event types that can come from the client
  17. type ClientEventType string
  18. const (
  19. UnknownClientEventType ClientEventType = ""
  20. StartStreaming ClientEventType = "start_streaming"
  21. StopStreaming ClientEventType = "stop_streaming"
  22. UnknownServerEventType ServerEventType = ""
  23. Logs ServerEventType = "logs"
  24. )
  25. // ServerEvent is the base struct that informs, based of the Type field, which Event type was provided from the server.
  26. type ServerEvent struct {
  27. Type ServerEventType `json:"type,omitempty"`
  28. // The raw json message is provided to allow better deserialization once the type is known
  29. event jsoniter.RawMessage
  30. }
  31. // ClientEvent is the base struct that informs, based of the Type field, which Event type was provided from the client.
  32. type ClientEvent struct {
  33. Type ClientEventType `json:"type,omitempty"`
  34. // The raw json message is provided to allow better deserialization once the type is known
  35. event jsoniter.RawMessage
  36. }
  37. // EventStartStreaming signifies that the client wishes to start receiving log events.
  38. // Additional filters can be provided to augment the log events requested.
  39. type EventStartStreaming struct {
  40. ClientEvent
  41. Filters *StreamingFilters `json:"filters,omitempty"`
  42. }
  43. type StreamingFilters struct {
  44. Events []LogEventType `json:"events,omitempty"`
  45. Level *LogLevel `json:"level,omitempty"`
  46. Sampling float64 `json:"sampling,omitempty"`
  47. }
  48. // EventStopStreaming signifies that the client wishes to halt receiving log events.
  49. type EventStopStreaming struct {
  50. ClientEvent
  51. }
  52. // EventLog is the event that the server sends to the client with the log events.
  53. type EventLog struct {
  54. ServerEvent
  55. Logs []*Log `json:"logs"`
  56. }
  57. // LogEventType is the way that logging messages are able to be filtered.
  58. // Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
  59. // the Cloudflared-related events.
  60. type LogEventType int8
  61. const (
  62. // Cloudflared events are signficant to cloudflared operations like connection state changes.
  63. // Cloudflared is also the default event type for any events that haven't been separated into a proper event type.
  64. Cloudflared LogEventType = iota
  65. HTTP
  66. TCP
  67. UDP
  68. )
  69. func ParseLogEventType(s string) (LogEventType, bool) {
  70. switch s {
  71. case "cloudflared":
  72. return Cloudflared, true
  73. case "http":
  74. return HTTP, true
  75. case "tcp":
  76. return TCP, true
  77. case "udp":
  78. return UDP, true
  79. }
  80. return -1, false
  81. }
  82. func (l LogEventType) String() string {
  83. switch l {
  84. case Cloudflared:
  85. return "cloudflared"
  86. case HTTP:
  87. return "http"
  88. case TCP:
  89. return "tcp"
  90. case UDP:
  91. return "udp"
  92. default:
  93. return ""
  94. }
  95. }
  96. func (l LogEventType) MarshalJSON() ([]byte, error) {
  97. return json.Marshal(l.String())
  98. }
  99. func (e *LogEventType) UnmarshalJSON(data []byte) error {
  100. var s string
  101. if err := json.Unmarshal(data, &s); err != nil {
  102. return errors.New("unable to unmarshal LogEventType string")
  103. }
  104. if event, ok := ParseLogEventType(s); ok {
  105. *e = event
  106. return nil
  107. }
  108. return errors.New("unable to unmarshal LogEventType")
  109. }
  110. // LogLevel corresponds to the zerolog logging levels
  111. // "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least
  112. // the the first two are limited to failure conditions that lead to cloudflared shutting down.
  113. type LogLevel int8
  114. const (
  115. Debug LogLevel = 0
  116. Info LogLevel = 1
  117. Warn LogLevel = 2
  118. Error LogLevel = 3
  119. )
  120. func ParseLogLevel(l string) (LogLevel, bool) {
  121. switch l {
  122. case "debug":
  123. return Debug, true
  124. case "info":
  125. return Info, true
  126. case "warn":
  127. return Warn, true
  128. case "error":
  129. return Error, true
  130. }
  131. return -1, false
  132. }
  133. func (l LogLevel) String() string {
  134. switch l {
  135. case Debug:
  136. return "debug"
  137. case Info:
  138. return "info"
  139. case Warn:
  140. return "warn"
  141. case Error:
  142. return "error"
  143. default:
  144. return ""
  145. }
  146. }
  147. func (l LogLevel) MarshalJSON() ([]byte, error) {
  148. return json.Marshal(l.String())
  149. }
  150. func (l *LogLevel) UnmarshalJSON(data []byte) error {
  151. var s string
  152. if err := json.Unmarshal(data, &s); err != nil {
  153. return errors.New("unable to unmarshal LogLevel string")
  154. }
  155. if level, ok := ParseLogLevel(s); ok {
  156. *l = level
  157. return nil
  158. }
  159. return fmt.Errorf("unable to unmarshal LogLevel")
  160. }
  161. const (
  162. // TimeKey aligns with the zerolog.TimeFieldName
  163. TimeKey = "time"
  164. // LevelKey aligns with the zerolog.LevelFieldName
  165. LevelKey = "level"
  166. // LevelKey aligns with the zerolog.MessageFieldName
  167. MessageKey = "message"
  168. // EventTypeKey is the custom JSON key of the LogEventType in ZeroLogEvent
  169. EventTypeKey = "event"
  170. // FieldsKey is a custom JSON key to match and store every other key for a zerolog event
  171. FieldsKey = "fields"
  172. )
  173. // Log is the basic structure of the events that are sent to the client.
  174. type Log struct {
  175. Time string `json:"time,omitempty"`
  176. Level LogLevel `json:"level,omitempty"`
  177. Message string `json:"message,omitempty"`
  178. Event LogEventType `json:"event,omitempty"`
  179. Fields map[string]interface{} `json:"fields,omitempty"`
  180. }
  181. // IntoClientEvent unmarshals the provided ClientEvent into the proper type.
  182. func IntoClientEvent[T EventStartStreaming | EventStopStreaming](e *ClientEvent, eventType ClientEventType) (*T, bool) {
  183. if e.Type != eventType {
  184. return nil, false
  185. }
  186. event := new(T)
  187. err := json.Unmarshal(e.event, event)
  188. if err != nil {
  189. return nil, false
  190. }
  191. return event, true
  192. }
  193. // IntoServerEvent unmarshals the provided ServerEvent into the proper type.
  194. func IntoServerEvent[T EventLog](e *ServerEvent, eventType ServerEventType) (*T, bool) {
  195. if e.Type != eventType {
  196. return nil, false
  197. }
  198. event := new(T)
  199. err := json.Unmarshal(e.event, event)
  200. if err != nil {
  201. return nil, false
  202. }
  203. return event, true
  204. }
  205. // ReadEvent will read a message from the websocket connection and parse it into a valid ServerEvent.
  206. func ReadServerEvent(c *websocket.Conn, ctx context.Context) (*ServerEvent, error) {
  207. message, err := readMessage(c, ctx)
  208. if err != nil {
  209. return nil, err
  210. }
  211. event := ServerEvent{}
  212. if err := json.Unmarshal(message, &event); err != nil {
  213. return nil, err
  214. }
  215. switch event.Type {
  216. case Logs:
  217. event.event = message
  218. return &event, nil
  219. case UnknownServerEventType:
  220. return nil, errInvalidMessageType
  221. default:
  222. return nil, fmt.Errorf("invalid server message type was provided: %s", event.Type)
  223. }
  224. }
  225. // ReadEvent will read a message from the websocket connection and parse it into a valid ClientEvent.
  226. func ReadClientEvent(c *websocket.Conn, ctx context.Context) (*ClientEvent, error) {
  227. message, err := readMessage(c, ctx)
  228. if err != nil {
  229. return nil, err
  230. }
  231. event := ClientEvent{}
  232. if err := json.Unmarshal(message, &event); err != nil {
  233. return nil, err
  234. }
  235. switch event.Type {
  236. case StartStreaming, StopStreaming:
  237. event.event = message
  238. return &event, nil
  239. case UnknownClientEventType:
  240. return nil, errInvalidMessageType
  241. default:
  242. return nil, fmt.Errorf("invalid client message type was provided: %s", event.Type)
  243. }
  244. }
  245. // readMessage will read a message from the websocket connection and return the payload.
  246. func readMessage(c *websocket.Conn, ctx context.Context) ([]byte, error) {
  247. messageType, reader, err := c.Reader(ctx)
  248. if err != nil {
  249. return nil, err
  250. }
  251. if messageType != websocket.MessageText {
  252. return nil, errInvalidMessageType
  253. }
  254. return io.ReadAll(reader)
  255. }
  256. // WriteEvent will write a Event type message to the websocket connection.
  257. func WriteEvent(c *websocket.Conn, ctx context.Context, event any) error {
  258. payload, err := json.Marshal(event)
  259. if err != nil {
  260. return err
  261. }
  262. return c.Write(ctx, websocket.MessageText, payload)
  263. }
  264. // IsClosed returns true if the websocket error is a websocket.CloseError; returns false if not a
  265. // websocket.CloseError
  266. func IsClosed(err error, log *zerolog.Logger) bool {
  267. var closeErr websocket.CloseError
  268. if errors.As(err, &closeErr) {
  269. if closeErr.Code != websocket.StatusNormalClosure {
  270. log.Debug().Msgf("connection is already closed: (%d) %s", closeErr.Code, closeErr.Reason)
  271. }
  272. return true
  273. }
  274. return false
  275. }
  276. func AsClosed(err error) *websocket.CloseError {
  277. var closeErr websocket.CloseError
  278. if errors.As(err, &closeErr) {
  279. return &closeErr
  280. }
  281. return nil
  282. }