logger.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package management
  2. import (
  3. "os"
  4. "sync"
  5. "time"
  6. jsoniter "github.com/json-iterator/go"
  7. "github.com/rs/zerolog"
  8. )
  9. var json = jsoniter.ConfigFastest
  10. // Logger manages the number of management streaming log sessions
  11. type Logger struct {
  12. sessions []*session
  13. mu sync.RWMutex
  14. // Unique logger that isn't a io.Writer of the list of zerolog writers. This helps prevent management log
  15. // statements from creating infinite recursion to export messages to a session and allows basic debugging and
  16. // error statements to be issued in the management code itself.
  17. Log *zerolog.Logger
  18. }
  19. func NewLogger() *Logger {
  20. log := zerolog.New(zerolog.ConsoleWriter{
  21. Out: os.Stdout,
  22. TimeFormat: time.RFC3339,
  23. }).With().Timestamp().Logger().Level(zerolog.InfoLevel)
  24. return &Logger{
  25. Log: &log,
  26. }
  27. }
  28. type LoggerListener interface {
  29. // ActiveSession returns the first active session for the requested actor.
  30. ActiveSession(actor) *session
  31. // ActiveSession returns the count of active sessions.
  32. ActiveSessions() int
  33. // Listen appends the session to the list of sessions that receive log events.
  34. Listen(*session)
  35. // Remove a session from the available sessions that were receiving log events.
  36. Remove(*session)
  37. }
  38. func (l *Logger) ActiveSession(actor actor) *session {
  39. l.mu.RLock()
  40. defer l.mu.RUnlock()
  41. for _, session := range l.sessions {
  42. if session.actor.ID == actor.ID && session.active.Load() {
  43. return session
  44. }
  45. }
  46. return nil
  47. }
  48. func (l *Logger) ActiveSessions() int {
  49. l.mu.RLock()
  50. defer l.mu.RUnlock()
  51. count := 0
  52. for _, session := range l.sessions {
  53. if session.active.Load() {
  54. count += 1
  55. }
  56. }
  57. return count
  58. }
  59. func (l *Logger) Listen(session *session) {
  60. l.mu.Lock()
  61. defer l.mu.Unlock()
  62. session.active.Store(true)
  63. l.sessions = append(l.sessions, session)
  64. }
  65. func (l *Logger) Remove(session *session) {
  66. l.mu.Lock()
  67. defer l.mu.Unlock()
  68. index := -1
  69. for i, v := range l.sessions {
  70. if v == session {
  71. index = i
  72. break
  73. }
  74. }
  75. if index == -1 {
  76. // Not found
  77. return
  78. }
  79. copy(l.sessions[index:], l.sessions[index+1:])
  80. l.sessions = l.sessions[:len(l.sessions)-1]
  81. }
  82. // Write will write the log event to all sessions that have available capacity. For those that are full, the message
  83. // will be dropped.
  84. // This function is the interface that zerolog expects to call when a log event is to be written out.
  85. func (l *Logger) Write(p []byte) (int, error) {
  86. l.mu.RLock()
  87. defer l.mu.RUnlock()
  88. // return early if no active sessions
  89. if len(l.sessions) == 0 {
  90. return len(p), nil
  91. }
  92. event, err := parseZerologEvent(p)
  93. // drop event if unable to parse properly
  94. if err != nil {
  95. l.Log.Debug().Msg("unable to parse log event")
  96. return len(p), nil
  97. }
  98. for _, session := range l.sessions {
  99. session.Insert(event)
  100. }
  101. return len(p), nil
  102. }
  103. func (l *Logger) WriteLevel(level zerolog.Level, p []byte) (n int, err error) {
  104. return l.Write(p)
  105. }
  106. func parseZerologEvent(p []byte) (*Log, error) {
  107. var fields map[string]interface{}
  108. iter := json.BorrowIterator(p)
  109. defer json.ReturnIterator(iter)
  110. iter.ReadVal(&fields)
  111. if iter.Error != nil {
  112. return nil, iter.Error
  113. }
  114. logTime := time.Now().UTC().Format(zerolog.TimeFieldFormat)
  115. if t, ok := fields[TimeKey]; ok {
  116. if t, ok := t.(string); ok {
  117. logTime = t
  118. }
  119. }
  120. logLevel := Debug
  121. // A zerolog Debug event can be created and then an error can be added
  122. // via .Err(error), if so, we upgrade the level to error.
  123. if _, hasError := fields["error"]; hasError {
  124. logLevel = Error
  125. } else {
  126. if level, ok := fields[LevelKey]; ok {
  127. if level, ok := level.(string); ok {
  128. if logLevel, ok = ParseLogLevel(level); !ok {
  129. logLevel = Debug
  130. }
  131. }
  132. }
  133. }
  134. // Assume the event type is Cloudflared if unable to parse/find. This could be from log events that haven't
  135. // yet been tagged with the appropriate EventType yet.
  136. logEvent := Cloudflared
  137. e := fields[EventTypeKey]
  138. if e != nil {
  139. if eventNumber, ok := e.(float64); ok {
  140. logEvent = LogEventType(eventNumber)
  141. }
  142. }
  143. logMessage := ""
  144. if m, ok := fields[MessageKey]; ok {
  145. if m, ok := m.(string); ok {
  146. logMessage = m
  147. }
  148. }
  149. event := Log{
  150. Time: logTime,
  151. Level: logLevel,
  152. Event: logEvent,
  153. Message: logMessage,
  154. }
  155. // Remove the keys that have top level keys on Log
  156. delete(fields, TimeKey)
  157. delete(fields, LevelKey)
  158. delete(fields, EventTypeKey)
  159. delete(fields, MessageKey)
  160. // The rest of the keys go into the Fields
  161. event.Fields = fields
  162. return &event, nil
  163. }