funnel.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package packet
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/netip"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. var (
  12. ErrFunnelNotFound = errors.New("funnel not found")
  13. )
  14. // Funnel is an abstraction to pipe from 1 src to 1 or more destinations
  15. type Funnel interface {
  16. // Updates the last time traffic went through this funnel
  17. UpdateLastActive()
  18. // LastActive returns the last time there is traffic through this funnel
  19. LastActive() time.Time
  20. // Close closes the funnel. Further call to SendToDst or ReturnToSrc should return an error
  21. Close() error
  22. // Equal compares if 2 funnels are equivalent
  23. Equal(other Funnel) bool
  24. }
  25. // FunnelUniPipe is a unidirectional pipe for sending raw packets
  26. type FunnelUniPipe interface {
  27. // SendPacket sends a packet to/from the funnel. It must not modify the packet,
  28. // and after return it must not read the packet
  29. SendPacket(dst netip.Addr, pk RawPacket) error
  30. Close() error
  31. }
  32. type ActivityTracker struct {
  33. // last active unix time. Unit is seconds
  34. lastActive int64
  35. }
  36. func NewActivityTracker() *ActivityTracker {
  37. return &ActivityTracker{
  38. lastActive: time.Now().Unix(),
  39. }
  40. }
  41. func (at *ActivityTracker) UpdateLastActive() {
  42. atomic.StoreInt64(&at.lastActive, time.Now().Unix())
  43. }
  44. func (at *ActivityTracker) LastActive() time.Time {
  45. lastActive := atomic.LoadInt64(&at.lastActive)
  46. return time.Unix(lastActive, 0)
  47. }
  48. // FunnelID represents a key type that can be used by FunnelTracker
  49. type FunnelID interface {
  50. // Type returns the name of the type that implements the FunnelID
  51. Type() string
  52. fmt.Stringer
  53. }
  54. // FunnelTracker tracks funnel from the perspective of eyeball to origin
  55. type FunnelTracker struct {
  56. lock sync.RWMutex
  57. funnels map[FunnelID]Funnel
  58. }
  59. func NewFunnelTracker() *FunnelTracker {
  60. return &FunnelTracker{
  61. funnels: make(map[FunnelID]Funnel),
  62. }
  63. }
  64. func (ft *FunnelTracker) ScheduleCleanup(ctx context.Context, idleTimeout time.Duration) {
  65. checkIdleTicker := time.NewTicker(idleTimeout)
  66. defer checkIdleTicker.Stop()
  67. for {
  68. select {
  69. case <-ctx.Done():
  70. return
  71. case <-checkIdleTicker.C:
  72. ft.cleanup(idleTimeout)
  73. }
  74. }
  75. }
  76. func (ft *FunnelTracker) cleanup(idleTimeout time.Duration) {
  77. ft.lock.Lock()
  78. defer ft.lock.Unlock()
  79. now := time.Now()
  80. for id, funnel := range ft.funnels {
  81. lastActive := funnel.LastActive()
  82. if now.After(lastActive.Add(idleTimeout)) {
  83. funnel.Close()
  84. delete(ft.funnels, id)
  85. }
  86. }
  87. }
  88. func (ft *FunnelTracker) Get(id FunnelID) (Funnel, bool) {
  89. ft.lock.RLock()
  90. defer ft.lock.RUnlock()
  91. funnel, ok := ft.funnels[id]
  92. return funnel, ok
  93. }
  94. // Registers a funnel. If the `id` is already registered and `shouldReplaceFunc` returns true, it closes and replaces
  95. // the current funnel. If `newFunnelFunc` returns an error, the `id` will remain unregistered, even if it was registered
  96. // when calling this function.
  97. func (ft *FunnelTracker) GetOrRegister(
  98. id FunnelID,
  99. shouldReplaceFunc func(Funnel) bool,
  100. newFunnelFunc func() (Funnel, error),
  101. ) (funnel Funnel, new bool, err error) {
  102. ft.lock.Lock()
  103. defer ft.lock.Unlock()
  104. currentFunnel, exists := ft.funnels[id]
  105. if exists {
  106. if !shouldReplaceFunc(currentFunnel) {
  107. return currentFunnel, false, nil
  108. }
  109. currentFunnel.Close()
  110. delete(ft.funnels, id)
  111. }
  112. newFunnel, err := newFunnelFunc()
  113. if err != nil {
  114. return nil, false, err
  115. }
  116. ft.funnels[id] = newFunnel
  117. return newFunnel, true, nil
  118. }
  119. // Unregisters and closes a funnel if the funnel equals to the current funnel
  120. func (ft *FunnelTracker) Unregister(id FunnelID, funnel Funnel) (deleted bool) {
  121. ft.lock.Lock()
  122. defer ft.lock.Unlock()
  123. currentFunnel, exists := ft.funnels[id]
  124. if !exists {
  125. return true
  126. }
  127. if currentFunnel.Equal(funnel) {
  128. currentFunnel.Close()
  129. delete(ft.funnels, id)
  130. return true
  131. }
  132. return false
  133. }