broker.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. /*
  2. Broker acts as the HTTP signaling channel.
  3. It matches clients and snowflake proxies by passing corresponding
  4. SessionDescriptions in order to negotiate a WebRTC connection.
  5. */
  6. package main
  7. import (
  8. "container/heap"
  9. "flag"
  10. "io"
  11. "log"
  12. "net"
  13. "net/rpc"
  14. "os"
  15. "os/signal"
  16. "sync"
  17. "syscall"
  18. "time"
  19. "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
  20. // "github.com/prometheus/client_golang/prometheus"
  21. )
  22. type BrokerContext struct {
  23. snowflakes *SnowflakeHeap
  24. restrictedSnowflakes *SnowflakeHeap
  25. // Maps keeping track of snowflakeIDs required to match SDP answers from
  26. // the second http POST. Restricted snowflakes can only be matched up with
  27. // clients behind an unrestricted NAT.
  28. idToSnowflake map[string]*Snowflake
  29. // Synchronization for the snowflake map and heap
  30. snowflakeLock sync.Mutex
  31. proxyPolls chan *ProxyPoll
  32. metrics *Metrics
  33. }
  34. func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
  35. snowflakes := new(SnowflakeHeap)
  36. heap.Init(snowflakes)
  37. rSnowflakes := new(SnowflakeHeap)
  38. heap.Init(rSnowflakes)
  39. metrics, err := NewMetrics(metricsLogger)
  40. if err != nil {
  41. panic(err.Error())
  42. }
  43. if metrics == nil {
  44. panic("Failed to create metrics")
  45. }
  46. return &BrokerContext{
  47. snowflakes: snowflakes,
  48. restrictedSnowflakes: rSnowflakes,
  49. idToSnowflake: make(map[string]*Snowflake),
  50. proxyPolls: make(chan *ProxyPoll),
  51. metrics: metrics,
  52. }
  53. }
  54. // Proxies may poll for client offers concurrently.
  55. type ProxyPoll struct {
  56. id string
  57. proxyType string
  58. natType string
  59. offerChannel chan *ClientOffer
  60. }
  61. // Registers a Snowflake and waits for some Client to send an offer,
  62. // as part of the polling logic of the proxy handler.
  63. func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) *ClientOffer {
  64. request := new(ProxyPoll)
  65. request.id = id
  66. request.proxyType = proxyType
  67. request.natType = natType
  68. request.offerChannel = make(chan *ClientOffer)
  69. ctx.proxyPolls <- request
  70. // Block until an offer is available, or timeout which sends a nil offer.
  71. offer := <-request.offerChannel
  72. return offer
  73. }
  74. // goroutine which matches clients to proxies and sends SDP offers along.
  75. // Safely processes proxy requests, responding to them with either an available
  76. // client offer or nil on timeout / none are available.
  77. func (ctx *BrokerContext) Broker() {
  78. for request := range ctx.proxyPolls {
  79. snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType)
  80. // Wait for a client to avail an offer to the snowflake.
  81. go func(request *ProxyPoll) {
  82. select {
  83. case offer := <-snowflake.offerChannel:
  84. request.offerChannel <- offer
  85. case <-time.After(time.Second * ProxyTimeout):
  86. // This snowflake is no longer available to serve clients.
  87. ctx.snowflakeLock.Lock()
  88. defer ctx.snowflakeLock.Unlock()
  89. if snowflake.index != -1 {
  90. if request.natType == NATUnrestricted {
  91. heap.Remove(ctx.snowflakes, snowflake.index)
  92. } else {
  93. heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
  94. }
  95. // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": request.natType, "type": request.proxyType}).Dec()
  96. delete(ctx.idToSnowflake, snowflake.id)
  97. close(request.offerChannel)
  98. }
  99. }
  100. }(request)
  101. }
  102. }
  103. // Create and add a Snowflake to the heap.
  104. // Required to keep track of proxies between providing them
  105. // with an offer and awaiting their second POST with an answer.
  106. func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake {
  107. snowflake := new(Snowflake)
  108. snowflake.id = id
  109. snowflake.clients = 0
  110. snowflake.proxyType = proxyType
  111. snowflake.natType = natType
  112. snowflake.offerChannel = make(chan *ClientOffer)
  113. snowflake.answerChannel = make(chan string)
  114. ctx.snowflakeLock.Lock()
  115. if natType == NATUnrestricted {
  116. heap.Push(ctx.snowflakes, snowflake)
  117. } else {
  118. heap.Push(ctx.restrictedSnowflakes, snowflake)
  119. }
  120. // ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": natType, "type": proxyType}).Inc()
  121. ctx.snowflakeLock.Unlock()
  122. ctx.idToSnowflake[id] = snowflake
  123. return snowflake
  124. }
  125. // Client offer contains an SDP and the NAT type of the client
  126. type ClientOffer struct {
  127. natType string
  128. sdp []byte
  129. }
  130. func main() {
  131. var geoipDatabase string
  132. var geoip6Database string
  133. var disableGeoip bool
  134. var metricsFilename string
  135. var unsafeLogging bool
  136. var socket string
  137. flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
  138. flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
  139. flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
  140. flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
  141. flag.BoolVar(&unsafeLogging, "unsafe-logging", false, "prevent logs from being scrubbed")
  142. flag.StringVar(&socket, "socket", "/tmp/broker.sock", "path to ipc socket")
  143. flag.Parse()
  144. var err error
  145. var metricsFile io.Writer
  146. var logOutput io.Writer = os.Stderr
  147. if unsafeLogging {
  148. log.SetOutput(logOutput)
  149. } else {
  150. // We want to send the log output through our scrubber first
  151. log.SetOutput(&safelog.LogScrubber{Output: logOutput})
  152. }
  153. log.SetFlags(log.LstdFlags | log.LUTC)
  154. if metricsFilename != "" {
  155. metricsFile, err = os.OpenFile(metricsFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  156. if err != nil {
  157. log.Fatal(err.Error())
  158. }
  159. } else {
  160. metricsFile = os.Stdout
  161. }
  162. metricsLogger := log.New(metricsFile, "", 0)
  163. ctx := NewBrokerContext(metricsLogger)
  164. if !disableGeoip {
  165. err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
  166. if err != nil {
  167. log.Fatal(err.Error())
  168. }
  169. }
  170. go ctx.Broker()
  171. sigChan := make(chan os.Signal, 1)
  172. signal.Notify(sigChan, syscall.SIGHUP)
  173. // go routine to handle a SIGHUP signal to allow the broker operator to send
  174. // a SIGHUP signal when the geoip database files are updated, without requiring
  175. // a restart of the broker
  176. go func() {
  177. for {
  178. signal := <-sigChan
  179. log.Printf("Received signal: %s. Reloading geoip databases.", signal)
  180. if err = ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database); err != nil {
  181. log.Fatalf("reload of Geo IP databases on signal %s returned error: %v", signal, err)
  182. }
  183. }
  184. }()
  185. // if err := os.RemoveAll(socket); err != nil {
  186. // log.Fatal(err)
  187. // }
  188. ipc := &IPC{ctx}
  189. rpc.Register(ipc)
  190. l, err := net.Listen("unix", socket)
  191. if err != nil {
  192. log.Fatal(err)
  193. }
  194. defer l.Close()
  195. rpc.Accept(l)
  196. }