limiter.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package flow
  2. import (
  3. "errors"
  4. "sync"
  5. )
  6. const (
  7. unlimitedActiveFlows = 0
  8. )
  9. var (
  10. ErrTooManyActiveFlows = errors.New("too many active flows")
  11. )
  12. type Limiter interface {
  13. // Acquire tries to acquire a free slot for a flow, if the value of flows is already above
  14. // the maximum it returns ErrTooManyActiveFlows.
  15. Acquire(flowType string) error
  16. // Release releases a slot for a flow.
  17. Release()
  18. // SetLimit allows to hot swap the limit value of the limiter.
  19. SetLimit(uint64)
  20. }
  21. type flowLimiter struct {
  22. limiterLock sync.Mutex
  23. activeFlowsCounter uint64
  24. maxActiveFlows uint64
  25. unlimited bool
  26. }
  27. func NewLimiter(maxActiveFlows uint64) Limiter {
  28. flowLimiter := &flowLimiter{
  29. maxActiveFlows: maxActiveFlows,
  30. unlimited: isUnlimited(maxActiveFlows),
  31. }
  32. return flowLimiter
  33. }
  34. func (s *flowLimiter) Acquire(flowType string) error {
  35. s.limiterLock.Lock()
  36. defer s.limiterLock.Unlock()
  37. if !s.unlimited && s.activeFlowsCounter >= s.maxActiveFlows {
  38. flowRegistrationsDropped.WithLabelValues(flowType).Inc()
  39. return ErrTooManyActiveFlows
  40. }
  41. s.activeFlowsCounter++
  42. return nil
  43. }
  44. func (s *flowLimiter) Release() {
  45. s.limiterLock.Lock()
  46. defer s.limiterLock.Unlock()
  47. if s.activeFlowsCounter <= 0 {
  48. return
  49. }
  50. s.activeFlowsCounter--
  51. }
  52. func (s *flowLimiter) SetLimit(newMaxActiveFlows uint64) {
  53. s.limiterLock.Lock()
  54. defer s.limiterLock.Unlock()
  55. s.maxActiveFlows = newMaxActiveFlows
  56. s.unlimited = isUnlimited(newMaxActiveFlows)
  57. }
  58. // isUnlimited checks if the value received matches the configuration for the unlimited flow limiter.
  59. func isUnlimited(value uint64) bool {
  60. return value == unlimitedActiveFlows
  61. }