123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- /*
- Broker acts as the HTTP signaling channel.
- It matches clients and snowflake proxies by passing corresponding
- SessionDescriptions in order to negotiate a WebRTC connection.
- */
- package main
- import (
- "container/heap"
- "crypto/tls"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "os"
- "os/signal"
- "strings"
- "syscall"
- "time"
- "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
- "golang.org/x/crypto/acme/autocert"
- )
- const (
- ClientTimeout = 10
- ProxyTimeout = 10
- readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
- )
- type BrokerContext struct {
- snowflakes *SnowflakeHeap
- // Map keeping track of snowflakeIDs required to match SDP answers from
- // the second http POST.
- idToSnowflake map[string]*Snowflake
- proxyPolls chan *ProxyPoll
- metrics *Metrics
- }
- func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
- snowflakes := new(SnowflakeHeap)
- heap.Init(snowflakes)
- metrics, err := NewMetrics(metricsLogger)
- if err != nil {
- panic(err.Error())
- }
- if metrics == nil {
- panic("Failed to create metrics")
- }
- return &BrokerContext{
- snowflakes: snowflakes,
- idToSnowflake: make(map[string]*Snowflake),
- proxyPolls: make(chan *ProxyPoll),
- metrics: metrics,
- }
- }
- // Implements the http.Handler interface
- type SnowflakeHandler struct {
- *BrokerContext
- handle func(*BrokerContext, http.ResponseWriter, *http.Request)
- }
- func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
- // Return early if it's CORS preflight.
- if "OPTIONS" == r.Method {
- return
- }
- sh.handle(sh.BrokerContext, w, r)
- }
- // Proxies may poll for client offers concurrently.
- type ProxyPoll struct {
- id string
- offerChannel chan []byte
- }
- // Registers a Snowflake and waits for some Client to send an offer,
- // as part of the polling logic of the proxy handler.
- func (ctx *BrokerContext) RequestOffer(id string) []byte {
- request := new(ProxyPoll)
- request.id = id
- request.offerChannel = make(chan []byte)
- ctx.proxyPolls <- request
- // Block until an offer is available, or timeout which sends a nil offer.
- offer := <-request.offerChannel
- return offer
- }
- // goroutine which matches clients to proxies and sends SDP offers along.
- // Safely processes proxy requests, responding to them with either an available
- // client offer or nil on timeout / none are available.
- func (ctx *BrokerContext) Broker() {
- for request := range ctx.proxyPolls {
- snowflake := ctx.AddSnowflake(request.id)
- // Wait for a client to avail an offer to the snowflake.
- go func(request *ProxyPoll) {
- select {
- case offer := <-snowflake.offerChannel:
- log.Println("Passing client offer to snowflake proxy.")
- request.offerChannel <- offer
- case <-time.After(time.Second * ProxyTimeout):
- // This snowflake is no longer available to serve clients.
- // TODO: Fix race using a delete channel
- heap.Remove(ctx.snowflakes, snowflake.index)
- delete(ctx.idToSnowflake, snowflake.id)
- request.offerChannel <- nil
- }
- }(request)
- }
- }
- // Create and add a Snowflake to the heap.
- // Required to keep track of proxies between providing them
- // with an offer and awaiting their second POST with an answer.
- func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
- snowflake := new(Snowflake)
- snowflake.id = id
- snowflake.clients = 0
- snowflake.offerChannel = make(chan []byte)
- snowflake.answerChannel = make(chan []byte)
- heap.Push(ctx.snowflakes, snowflake)
- ctx.idToSnowflake[id] = snowflake
- return snowflake
- }
- /*
- For snowflake proxies to request a client from the Broker.
- */
- func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
- id := r.Header.Get("X-Session-ID")
- body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
- if nil != err {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- if string(body) != id {
- log.Println("Mismatched IDs!")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- log.Println("Received snowflake: ", id)
- // Log geoip stats
- remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
- if err != nil {
- log.Println("Error processing proxy IP: ", err.Error())
- } else {
- ctx.metrics.UpdateCountryStats(remoteIP)
- }
- // Wait for a client to avail an offer to the snowflake, or timeout if nil.
- offer := ctx.RequestOffer(id)
- if nil == offer {
- log.Println("Proxy " + id + " did not receive a Client offer.")
- ctx.metrics.proxyIdleCount++
- w.WriteHeader(http.StatusGatewayTimeout)
- return
- }
- log.Println("Passing client offer to snowflake.")
- w.Write(offer)
- }
- /*
- Expects a WebRTC SDP offer in the Request to give to an assigned
- snowflake proxy, which responds with the SDP answer to be sent in
- the HTTP response back to the client.
- */
- func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
- startTime := time.Now()
- offer, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
- if nil != err {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- // Immediately fail if there are no snowflakes available.
- if ctx.snowflakes.Len() <= 0 {
- log.Println("Client: No snowflake proxies available.")
- ctx.metrics.clientDeniedCount++
- w.WriteHeader(http.StatusServiceUnavailable)
- return
- }
- // Otherwise, find the most available snowflake proxy, and pass the offer to it.
- // Delete must be deferred in order to correctly process answer request later.
- snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
- defer delete(ctx.idToSnowflake, snowflake.id)
- snowflake.offerChannel <- offer
- // Wait for the answer to be returned on the channel or timeout.
- select {
- case answer := <-snowflake.answerChannel:
- log.Println("Client: Retrieving answer")
- ctx.metrics.clientProxyMatchCount++
- w.Write(answer)
- // Initial tracking of elapsed time.
- ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
- time.Millisecond
- case <-time.After(time.Second * ClientTimeout):
- log.Println("Client: Timed out.")
- w.WriteHeader(http.StatusGatewayTimeout)
- w.Write([]byte("timed out waiting for answer!"))
- }
- }
- /*
- Expects snowflake proxes which have previously successfully received
- an offer from proxyHandler to respond with an answer in an HTTP POST,
- which the broker will pass back to the original client.
- */
- func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
- id := r.Header.Get("X-Session-ID")
- snowflake, ok := ctx.idToSnowflake[id]
- if !ok || nil == snowflake {
- // The snowflake took too long to respond with an answer, so its client
- // disappeared / the snowflake is no longer recognized by the Broker.
- w.WriteHeader(http.StatusGone)
- return
- }
- body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
- if nil != err || nil == body || len(body) <= 0 {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- log.Println("Received answer.")
- snowflake.answerChannel <- body
- }
- func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
- s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
- for _, snowflake := range ctx.idToSnowflake {
- s += fmt.Sprintf("\nsnowflake %d: %s", snowflake.index, snowflake.id)
- }
- s += fmt.Sprintf("\n\nroundtrip avg: %d", ctx.metrics.clientRoundtripEstimate)
- w.Write([]byte(s))
- }
- func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/plain; charset=utf-8")
- w.Write([]byte("User-agent: *\nDisallow: /\n"))
- }
- func main() {
- var acmeEmail string
- var acmeHostnamesCommas string
- var acmeCertCacheDir string
- var addr string
- var geoipDatabase string
- var geoip6Database string
- var disableTLS bool
- var certFilename, keyFilename string
- var disableGeoip bool
- var metricsFilename string
- flag.StringVar(&acmeEmail, "acme-email", "", "optional contact email for Let's Encrypt notifications")
- flag.StringVar(&acmeHostnamesCommas, "acme-hostnames", "", "comma-separated hostnames for TLS certificate")
- flag.StringVar(&certFilename, "cert", "", "TLS certificate file")
- flag.StringVar(&keyFilename, "key", "", "TLS private key file")
- flag.StringVar(&acmeCertCacheDir, "acme-cert-cache", "acme-cert-cache", "directory in which certificates should be cached")
- flag.StringVar(&addr, "addr", ":443", "address to listen on")
- flag.StringVar(&geoipDatabase, "geoipdb", "/usr/share/tor/geoip", "path to correctly formatted geoip database mapping IPv4 address ranges to country codes")
- flag.StringVar(&geoip6Database, "geoip6db", "/usr/share/tor/geoip6", "path to correctly formatted geoip database mapping IPv6 address ranges to country codes")
- flag.BoolVar(&disableTLS, "disable-tls", false, "don't use HTTPS")
- flag.BoolVar(&disableGeoip, "disable-geoip", false, "don't use geoip for stats collection")
- flag.StringVar(&metricsFilename, "metrics-log", "", "path to metrics logging output")
- flag.Parse()
- var err error
- var metricsFile io.Writer = os.Stdout
- var logOutput io.Writer = os.Stderr
- //We want to send the log output through our scrubber first
- log.SetOutput(&safelog.LogScrubber{Output: logOutput})
- log.SetFlags(log.LstdFlags | log.LUTC)
- if metricsFilename != "" {
- metricsFile, err = os.OpenFile(metricsFilename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
- if err != nil {
- log.Fatal(err.Error())
- }
- } else {
- metricsFile = os.Stdout
- }
- metricsLogger := log.New(metricsFile, "", 0)
- ctx := NewBrokerContext(metricsLogger)
- if !disableGeoip {
- err := ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
- if err != nil {
- log.Fatal(err.Error())
- }
- }
- go ctx.Broker()
- http.HandleFunc("/robots.txt", robotsTxtHandler)
- http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls})
- http.Handle("/client", SnowflakeHandler{ctx, clientOffers})
- http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
- http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
- server := http.Server{
- Addr: addr,
- }
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGHUP)
- // go routine to handle a SIGHUP signal to allow the broker operator to send
- // a SIGHUP signal when the geoip database files are updated, without requiring
- // a restart of the broker
- go func() {
- for {
- signal := <-sigChan
- log.Println("Received signal:", signal, ". Reloading geoip databases.")
- ctx.metrics.LoadGeoipDatabases(geoipDatabase, geoip6Database)
- }
- }()
- // Handle the various ways of setting up TLS. The legal configurations
- // are:
- // --acme-hostnames (with optional --acme-email and/or --acme-cert-cache)
- // --cert and --key together
- // --disable-tls
- // The outputs of this block of code are the disableTLS,
- // needHTTP01Listener, certManager, and getCertificate variables.
- if acmeHostnamesCommas != "" {
- acmeHostnames := strings.Split(acmeHostnamesCommas, ",")
- log.Printf("ACME hostnames: %q", acmeHostnames)
- var cache autocert.Cache
- if err = os.MkdirAll(acmeCertCacheDir, 0700); err != nil {
- log.Printf("Warning: Couldn't create cache directory %q (reason: %s) so we're *not* using our certificate cache.", acmeCertCacheDir, err)
- } else {
- cache = autocert.DirCache(acmeCertCacheDir)
- }
- certManager := autocert.Manager{
- Cache: cache,
- Prompt: autocert.AcceptTOS,
- HostPolicy: autocert.HostWhitelist(acmeHostnames...),
- Email: acmeEmail,
- }
- go func() {
- log.Printf("Starting HTTP-01 listener")
- log.Fatal(http.ListenAndServe(":80", certManager.HTTPHandler(nil)))
- }()
- server.TLSConfig = &tls.Config{GetCertificate: certManager.GetCertificate}
- err = server.ListenAndServeTLS("", "")
- } else if certFilename != "" && keyFilename != "" {
- if acmeEmail != "" || acmeHostnamesCommas != "" {
- log.Fatalf("The --cert and --key options are not allowed with --acme-email or --acme-hostnames.")
- }
- err = server.ListenAndServeTLS(certFilename, keyFilename)
- } else if disableTLS {
- err = server.ListenAndServe()
- } else {
- log.Fatal("the --acme-hostnames, --cert and --key, or --disable-tls option is required")
- }
- if err != nil {
- log.Fatal(err)
- }
- }
|