peers.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package lib
  2. import (
  3. "container/list"
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sync"
  8. )
  9. // Container which keeps track of multiple WebRTC remote peers.
  10. // Implements |SnowflakeCollector|.
  11. //
  12. // Maintaining a set of pre-connected Peers with fresh but inactive datachannels
  13. // allows allows rapid recovery when the current WebRTC Peer disconnects.
  14. //
  15. // Note: For now, only one remote can be active at any given moment.
  16. // This is a property of Tor circuits & its current multiplexing constraints,
  17. // but could be updated if that changes.
  18. // (Also, this constraint does not necessarily apply to the more generic PT
  19. // version of Snowflake)
  20. type Peers struct {
  21. Tongue
  22. BytesLogger BytesLogger
  23. snowflakeChan chan *WebRTCPeer
  24. activePeers *list.List
  25. melt chan struct{}
  26. melted bool
  27. collection sync.WaitGroup
  28. }
  29. // Construct a fresh container of remote peers.
  30. func NewPeers(tongue Tongue) (*Peers, error) {
  31. p := &Peers{}
  32. // Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
  33. if tongue == nil {
  34. return nil, errors.New("missing Tongue to catch Snowflakes with")
  35. }
  36. p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
  37. p.activePeers = list.New()
  38. p.melt = make(chan struct{})
  39. p.Tongue = tongue
  40. return p, nil
  41. }
  42. // As part of |SnowflakeCollector| interface.
  43. func (p *Peers) Collect() (*WebRTCPeer, error) {
  44. // Engage the Snowflake Catching interface, which must be available.
  45. p.collection.Add(1)
  46. defer p.collection.Done()
  47. if p.melted {
  48. return nil, fmt.Errorf("Snowflakes have melted")
  49. }
  50. if nil == p.Tongue {
  51. return nil, errors.New("missing Tongue to catch Snowflakes with")
  52. }
  53. cnt := p.Count()
  54. capacity := p.Tongue.GetMax()
  55. s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
  56. if cnt >= capacity {
  57. return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
  58. }
  59. log.Println("WebRTC: Collecting a new Snowflake.", s)
  60. // BUG: some broker conflict here.
  61. connection, err := p.Tongue.Catch()
  62. if nil != err {
  63. return nil, err
  64. }
  65. // Track new valid Snowflake in internal collection and pass along.
  66. p.activePeers.PushBack(connection)
  67. p.snowflakeChan <- connection
  68. return connection, nil
  69. }
  70. // Pop blocks until an available, valid snowflake appears. Returns nil after End
  71. // has been called.
  72. func (p *Peers) Pop() *WebRTCPeer {
  73. for {
  74. snowflake, ok := <-p.snowflakeChan
  75. if !ok {
  76. return nil
  77. }
  78. if snowflake.closed {
  79. continue
  80. }
  81. // Set to use the same rate-limited traffic logger to keep consistency.
  82. snowflake.BytesLogger = p.BytesLogger
  83. return snowflake
  84. }
  85. }
  86. // As part of |SnowflakeCollector| interface.
  87. func (p *Peers) Melted() <-chan struct{} {
  88. return p.melt
  89. }
  90. // Returns total available Snowflakes (including the active one)
  91. // The count only reduces when connections themselves close, rather than when
  92. // they are popped.
  93. func (p *Peers) Count() int {
  94. p.purgeClosedPeers()
  95. return p.activePeers.Len()
  96. }
  97. func (p *Peers) purgeClosedPeers() {
  98. for e := p.activePeers.Front(); e != nil; {
  99. next := e.Next()
  100. conn := e.Value.(*WebRTCPeer)
  101. // Purge those marked for deletion.
  102. if conn.closed {
  103. p.activePeers.Remove(e)
  104. }
  105. e = next
  106. }
  107. }
  108. // Close all Peers contained here.
  109. func (p *Peers) End() {
  110. close(p.melt)
  111. p.melted = true
  112. p.collection.Wait()
  113. close(p.snowflakeChan)
  114. cnt := p.Count()
  115. for e := p.activePeers.Front(); e != nil; {
  116. next := e.Next()
  117. conn := e.Value.(*WebRTCPeer)
  118. conn.Close()
  119. p.activePeers.Remove(e)
  120. e = next
  121. }
  122. log.Printf("WebRTC: melted all %d snowflakes.", cnt)
  123. }