session.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package management
  2. import (
  3. "context"
  4. "math/rand"
  5. "sync/atomic"
  6. )
  7. const (
  8. // Indicates how many log messages the listener will hold before dropping.
  9. // Provides a throttling mechanism to drop latest messages if the sender
  10. // can't keep up with the influx of log messages.
  11. logWindow = 30
  12. )
  13. // session captures a streaming logs session for a connection of an actor.
  14. type session struct {
  15. // Indicates if the session is streaming or not. Modifying this will affect the active session.
  16. active atomic.Bool
  17. // Allows the session to control the context of the underlying connection to close it out when done. Mostly
  18. // used by the LoggerListener to close out and cleanup a session.
  19. cancel context.CancelFunc
  20. // Actor who started the session
  21. actor actor
  22. // Buffered channel that holds the recent log events
  23. listener chan *Log
  24. // Types of log events that this session will provide through the listener
  25. filters *StreamingFilters
  26. // Sampling of the log events this session will send (runs after all other filters if available)
  27. sampler *sampler
  28. }
  29. // NewSession creates a new session.
  30. func newSession(size int, actor actor, cancel context.CancelFunc) *session {
  31. s := &session{
  32. active: atomic.Bool{},
  33. cancel: cancel,
  34. actor: actor,
  35. listener: make(chan *Log, size),
  36. filters: &StreamingFilters{},
  37. }
  38. return s
  39. }
  40. // Filters assigns the StreamingFilters to the session
  41. func (s *session) Filters(filters *StreamingFilters) {
  42. if filters != nil {
  43. s.filters = filters
  44. sampling := filters.Sampling
  45. // clamp the sampling values between 0 and 1
  46. if sampling < 0 {
  47. sampling = 0
  48. }
  49. if sampling > 1 {
  50. sampling = 1
  51. }
  52. s.filters.Sampling = sampling
  53. if sampling > 0 && sampling < 1 {
  54. s.sampler = &sampler{
  55. p: int(sampling * 100),
  56. }
  57. }
  58. } else {
  59. s.filters = &StreamingFilters{}
  60. }
  61. }
  62. // Insert attempts to insert the log to the session. If the log event matches the provided session filters, it
  63. // will be applied to the listener.
  64. func (s *session) Insert(log *Log) {
  65. // Level filters are optional
  66. if s.filters.Level != nil {
  67. if *s.filters.Level > log.Level {
  68. return
  69. }
  70. }
  71. // Event filters are optional
  72. if len(s.filters.Events) != 0 && !contains(s.filters.Events, log.Event) {
  73. return
  74. }
  75. // Sampling is also optional
  76. if s.sampler != nil && !s.sampler.Sample() {
  77. return
  78. }
  79. select {
  80. case s.listener <- log:
  81. default:
  82. // buffer is full, discard
  83. }
  84. }
  85. // Active returns if the session is active
  86. func (s *session) Active() bool {
  87. return s.active.Load()
  88. }
  89. // Stop will halt the session
  90. func (s *session) Stop() {
  91. s.active.Store(false)
  92. }
  93. func contains(array []LogEventType, t LogEventType) bool {
  94. for _, v := range array {
  95. if v == t {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. // sampler will send approximately every p percentage log events out of 100.
  102. type sampler struct {
  103. p int
  104. }
  105. // Sample returns true if the event should be part of the sample, false if the event should be dropped.
  106. func (s *sampler) Sample() bool {
  107. return rand.Intn(100) <= s.p
  108. }