clientmap.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package turbotunnel
  2. import (
  3. "container/heap"
  4. "net"
  5. "sync"
  6. "time"
  7. )
  8. // clientRecord is a record of a recently seen client, with the time it was last
  9. // seen and a send queue.
  10. type clientRecord struct {
  11. Addr net.Addr
  12. LastSeen time.Time
  13. SendQueue chan []byte
  14. }
  15. // ClientMap manages a mapping of live clients (keyed by address, which will be
  16. // a ClientID) to their respective send queues. ClientMap's functions are safe
  17. // to call from multiple goroutines.
  18. type ClientMap struct {
  19. // We use an inner structure to avoid exposing public heap.Interface
  20. // functions to users of clientMap.
  21. inner clientMapInner
  22. // Synchronizes access to inner.
  23. lock sync.Mutex
  24. }
  25. // NewClientMap creates a ClientMap that expires clients after a timeout.
  26. //
  27. // The timeout does not have to be kept in sync with smux's internal idle
  28. // timeout. If a client is removed from the client map while the smux session is
  29. // still live, the worst that can happen is a loss of whatever packets were in
  30. // the send queue at the time. If smux later decides to send more packets to the
  31. // same client, we'll instantiate a new send queue, and if the client ever
  32. // connects again with the proper client ID, we'll deliver them.
  33. func NewClientMap(timeout time.Duration) *ClientMap {
  34. m := &ClientMap{
  35. inner: clientMapInner{
  36. byAge: make([]*clientRecord, 0),
  37. byAddr: make(map[net.Addr]int),
  38. },
  39. }
  40. go func() {
  41. for {
  42. time.Sleep(timeout / 2)
  43. now := time.Now()
  44. m.lock.Lock()
  45. m.inner.removeExpired(now, timeout)
  46. m.lock.Unlock()
  47. }
  48. }()
  49. return m
  50. }
  51. // SendQueue returns the send queue corresponding to addr, creating it if
  52. // necessary.
  53. func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
  54. m.lock.Lock()
  55. queue := m.inner.SendQueue(addr, time.Now())
  56. m.lock.Unlock()
  57. return queue
  58. }
  59. // clientMapInner is the inner type of ClientMap, implementing heap.Interface.
  60. // byAge is the backing store, a heap ordered by LastSeen time, to facilitate
  61. // expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
  62. // to heap indices, to allow looking up by address. Unlike ClientMap,
  63. // clientMapInner requires external synchonization.
  64. type clientMapInner struct {
  65. byAge []*clientRecord
  66. byAddr map[net.Addr]int
  67. }
  68. // removeExpired removes all client records whose LastSeen timestamp is more
  69. // than timeout in the past.
  70. func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
  71. for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
  72. heap.Pop(inner)
  73. }
  74. }
  75. // SendQueue finds the existing client record corresponding to addr, or creates
  76. // a new one if none exists yet. It updates the client record's LastSeen time
  77. // and returns its SendQueue.
  78. func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
  79. var record *clientRecord
  80. i, ok := inner.byAddr[addr]
  81. if ok {
  82. // Found one, update its LastSeen.
  83. record = inner.byAge[i]
  84. record.LastSeen = now
  85. heap.Fix(inner, i)
  86. } else {
  87. // Not found, create a new one.
  88. record = &clientRecord{
  89. Addr: addr,
  90. LastSeen: now,
  91. SendQueue: make(chan []byte, queueSize),
  92. }
  93. heap.Push(inner, record)
  94. }
  95. return record.SendQueue
  96. }
  97. // heap.Interface for clientMapInner.
  98. func (inner *clientMapInner) Len() int {
  99. if len(inner.byAge) != len(inner.byAddr) {
  100. panic("inconsistent clientMap")
  101. }
  102. return len(inner.byAge)
  103. }
  104. func (inner *clientMapInner) Less(i, j int) bool {
  105. return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
  106. }
  107. func (inner *clientMapInner) Swap(i, j int) {
  108. inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
  109. inner.byAddr[inner.byAge[i].Addr] = i
  110. inner.byAddr[inner.byAge[j].Addr] = j
  111. }
  112. func (inner *clientMapInner) Push(x interface{}) {
  113. record := x.(*clientRecord)
  114. if _, ok := inner.byAddr[record.Addr]; ok {
  115. panic("duplicate address in clientMap")
  116. }
  117. // Insert into byAddr map.
  118. inner.byAddr[record.Addr] = len(inner.byAge)
  119. // Insert into byAge slice.
  120. inner.byAge = append(inner.byAge, record)
  121. }
  122. func (inner *clientMapInner) Pop() interface{} {
  123. n := len(inner.byAddr)
  124. // Remove from byAge slice.
  125. record := inner.byAge[n-1]
  126. inner.byAge[n-1] = nil
  127. inner.byAge = inner.byAge[:n-1]
  128. // Remove from byAddr map.
  129. delete(inner.byAddr, record.Addr)
  130. close(record.SendQueue)
  131. return record
  132. }