123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- package management
- import (
- "context"
- "errors"
- "fmt"
- "io"
- jsoniter "github.com/json-iterator/go"
- "github.com/rs/zerolog"
- "nhooyr.io/websocket"
- )
- var (
- errInvalidMessageType = fmt.Errorf("invalid message type was provided")
- )
- // ServerEventType represents the event types that can come from the server
- type ServerEventType string
- // ClientEventType represents the event types that can come from the client
- type ClientEventType string
- const (
- UnknownClientEventType ClientEventType = ""
- StartStreaming ClientEventType = "start_streaming"
- StopStreaming ClientEventType = "stop_streaming"
- UnknownServerEventType ServerEventType = ""
- Logs ServerEventType = "logs"
- )
- // ServerEvent is the base struct that informs, based of the Type field, which Event type was provided from the server.
- type ServerEvent struct {
- Type ServerEventType `json:"type,omitempty"`
- // The raw json message is provided to allow better deserialization once the type is known
- event jsoniter.RawMessage
- }
- // ClientEvent is the base struct that informs, based of the Type field, which Event type was provided from the client.
- type ClientEvent struct {
- Type ClientEventType `json:"type,omitempty"`
- // The raw json message is provided to allow better deserialization once the type is known
- event jsoniter.RawMessage
- }
- // EventStartStreaming signifies that the client wishes to start receiving log events.
- // Additional filters can be provided to augment the log events requested.
- type EventStartStreaming struct {
- ClientEvent
- Filters *StreamingFilters `json:"filters,omitempty"`
- }
- type StreamingFilters struct {
- Events []LogEventType `json:"events,omitempty"`
- Level *LogLevel `json:"level,omitempty"`
- Sampling float64 `json:"sampling,omitempty"`
- }
- // EventStopStreaming signifies that the client wishes to halt receiving log events.
- type EventStopStreaming struct {
- ClientEvent
- }
- // EventLog is the event that the server sends to the client with the log events.
- type EventLog struct {
- ServerEvent
- Logs []*Log `json:"logs"`
- }
- // LogEventType is the way that logging messages are able to be filtered.
- // Example: assigning LogEventType.Cloudflared to a zerolog event will allow the client to filter for only
- // the Cloudflared-related events.
- type LogEventType int8
- const (
- // Cloudflared events are signficant to cloudflared operations like connection state changes.
- // Cloudflared is also the default event type for any events that haven't been separated into a proper event type.
- Cloudflared LogEventType = iota
- HTTP
- TCP
- UDP
- )
- func ParseLogEventType(s string) (LogEventType, bool) {
- switch s {
- case "cloudflared":
- return Cloudflared, true
- case "http":
- return HTTP, true
- case "tcp":
- return TCP, true
- case "udp":
- return UDP, true
- }
- return -1, false
- }
- func (l LogEventType) String() string {
- switch l {
- case Cloudflared:
- return "cloudflared"
- case HTTP:
- return "http"
- case TCP:
- return "tcp"
- case UDP:
- return "udp"
- default:
- return ""
- }
- }
- func (l LogEventType) MarshalJSON() ([]byte, error) {
- return json.Marshal(l.String())
- }
- func (e *LogEventType) UnmarshalJSON(data []byte) error {
- var s string
- if err := json.Unmarshal(data, &s); err != nil {
- return errors.New("unable to unmarshal LogEventType string")
- }
- if event, ok := ParseLogEventType(s); ok {
- *e = event
- return nil
- }
- return errors.New("unable to unmarshal LogEventType")
- }
- // LogLevel corresponds to the zerolog logging levels
- // "panic", "fatal", and "trace" are exempt from this list as they are rarely used and, at least
- // the the first two are limited to failure conditions that lead to cloudflared shutting down.
- type LogLevel int8
- const (
- Debug LogLevel = 0
- Info LogLevel = 1
- Warn LogLevel = 2
- Error LogLevel = 3
- )
- func ParseLogLevel(l string) (LogLevel, bool) {
- switch l {
- case "debug":
- return Debug, true
- case "info":
- return Info, true
- case "warn":
- return Warn, true
- case "error":
- return Error, true
- }
- return -1, false
- }
- func (l LogLevel) String() string {
- switch l {
- case Debug:
- return "debug"
- case Info:
- return "info"
- case Warn:
- return "warn"
- case Error:
- return "error"
- default:
- return ""
- }
- }
- func (l LogLevel) MarshalJSON() ([]byte, error) {
- return json.Marshal(l.String())
- }
- func (l *LogLevel) UnmarshalJSON(data []byte) error {
- var s string
- if err := json.Unmarshal(data, &s); err != nil {
- return errors.New("unable to unmarshal LogLevel string")
- }
- if level, ok := ParseLogLevel(s); ok {
- *l = level
- return nil
- }
- return fmt.Errorf("unable to unmarshal LogLevel")
- }
- const (
- // TimeKey aligns with the zerolog.TimeFieldName
- TimeKey = "time"
- // LevelKey aligns with the zerolog.LevelFieldName
- LevelKey = "level"
- // LevelKey aligns with the zerolog.MessageFieldName
- MessageKey = "message"
- // EventTypeKey is the custom JSON key of the LogEventType in ZeroLogEvent
- EventTypeKey = "event"
- // FieldsKey is a custom JSON key to match and store every other key for a zerolog event
- FieldsKey = "fields"
- )
- // Log is the basic structure of the events that are sent to the client.
- type Log struct {
- Time string `json:"time,omitempty"`
- Level LogLevel `json:"level,omitempty"`
- Message string `json:"message,omitempty"`
- Event LogEventType `json:"event,omitempty"`
- Fields map[string]interface{} `json:"fields,omitempty"`
- }
- // IntoClientEvent unmarshals the provided ClientEvent into the proper type.
- func IntoClientEvent[T EventStartStreaming | EventStopStreaming](e *ClientEvent, eventType ClientEventType) (*T, bool) {
- if e.Type != eventType {
- return nil, false
- }
- event := new(T)
- err := json.Unmarshal(e.event, event)
- if err != nil {
- return nil, false
- }
- return event, true
- }
- // IntoServerEvent unmarshals the provided ServerEvent into the proper type.
- func IntoServerEvent[T EventLog](e *ServerEvent, eventType ServerEventType) (*T, bool) {
- if e.Type != eventType {
- return nil, false
- }
- event := new(T)
- err := json.Unmarshal(e.event, event)
- if err != nil {
- return nil, false
- }
- return event, true
- }
- // ReadEvent will read a message from the websocket connection and parse it into a valid ServerEvent.
- func ReadServerEvent(c *websocket.Conn, ctx context.Context) (*ServerEvent, error) {
- message, err := readMessage(c, ctx)
- if err != nil {
- return nil, err
- }
- event := ServerEvent{}
- if err := json.Unmarshal(message, &event); err != nil {
- return nil, err
- }
- switch event.Type {
- case Logs:
- event.event = message
- return &event, nil
- case UnknownServerEventType:
- return nil, errInvalidMessageType
- default:
- return nil, fmt.Errorf("invalid server message type was provided: %s", event.Type)
- }
- }
- // ReadEvent will read a message from the websocket connection and parse it into a valid ClientEvent.
- func ReadClientEvent(c *websocket.Conn, ctx context.Context) (*ClientEvent, error) {
- message, err := readMessage(c, ctx)
- if err != nil {
- return nil, err
- }
- event := ClientEvent{}
- if err := json.Unmarshal(message, &event); err != nil {
- return nil, err
- }
- switch event.Type {
- case StartStreaming, StopStreaming:
- event.event = message
- return &event, nil
- case UnknownClientEventType:
- return nil, errInvalidMessageType
- default:
- return nil, fmt.Errorf("invalid client message type was provided: %s", event.Type)
- }
- }
- // readMessage will read a message from the websocket connection and return the payload.
- func readMessage(c *websocket.Conn, ctx context.Context) ([]byte, error) {
- messageType, reader, err := c.Reader(ctx)
- if err != nil {
- return nil, err
- }
- if messageType != websocket.MessageText {
- return nil, errInvalidMessageType
- }
- return io.ReadAll(reader)
- }
- // WriteEvent will write a Event type message to the websocket connection.
- func WriteEvent(c *websocket.Conn, ctx context.Context, event any) error {
- payload, err := json.Marshal(event)
- if err != nil {
- return err
- }
- return c.Write(ctx, websocket.MessageText, payload)
- }
- // IsClosed returns true if the websocket error is a websocket.CloseError; returns false if not a
- // websocket.CloseError
- func IsClosed(err error, log *zerolog.Logger) bool {
- var closeErr websocket.CloseError
- if errors.As(err, &closeErr) {
- if closeErr.Code != websocket.StatusNormalClosure {
- log.Debug().Msgf("connection is already closed: (%d) %s", closeErr.Code, closeErr.Reason)
- }
- return true
- }
- return false
- }
- func AsClosed(err error) *websocket.CloseError {
- var closeErr websocket.CloseError
- if errors.As(err, &closeErr) {
- return &closeErr
- }
- return nil
- }
|