http.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package snowflake_server
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/hmac"
  6. "crypto/rand"
  7. "crypto/sha256"
  8. "encoding/binary"
  9. "fmt"
  10. "io"
  11. "log"
  12. "net"
  13. "net/http"
  14. "sync"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/encapsulation"
  18. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/turbotunnel"
  19. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/websocketconn"
  20. )
  21. const requestTimeout = 10 * time.Second
  22. // How long to remember outgoing packets for a client, when we don't currently
  23. // have an active WebSocket connection corresponding to that client. Because a
  24. // client session may span multiple WebSocket connections, we keep packets we
  25. // aren't able to send immediately in memory, for a little while but not
  26. // indefinitely.
  27. const clientMapTimeout = 1 * time.Minute
  28. // How big to make the map of ClientIDs to IP addresses. The map is used in
  29. // turbotunnelMode to store a reasonable IP address for a client session that
  30. // may outlive any single WebSocket connection.
  31. const clientIDAddrMapCapacity = 98304
  32. // How long to wait for ListenAndServe or ListenAndServeTLS to return an error
  33. // before deciding that it's not going to return.
  34. const listenAndServeErrorTimeout = 100 * time.Millisecond
  35. var upgrader = websocket.Upgrader{
  36. CheckOrigin: func(r *http.Request) bool { return true },
  37. }
  38. // clientIDAddrMap stores short-term mappings from ClientIDs to IP addresses.
  39. // When we call pt.DialOr, tor wants us to provide a USERADDR string that
  40. // represents the remote IP address of the client (for metrics purposes, etc.).
  41. // This data structure bridges the gap between ServeHTTP, which knows about IP
  42. // addresses, and handleStream, which is what calls pt.DialOr. The common piece
  43. // of information linking both ends of the chain is the ClientID, which is
  44. // attached to the WebSocket connection and every session.
  45. var clientIDAddrMap = newClientIDMap(clientIDAddrMapCapacity)
  46. type httpHandler struct {
  47. // pconns is the adapter layer between stream-oriented WebSocket
  48. // connections and the packet-oriented KCP layer. There are multiple of
  49. // these, corresponding to the multiple kcp.ServeConn in
  50. // Transport.Listen. Clients are assigned to a particular instance by a
  51. // hash of ClientID, indexed by a hash of the ClientID, in order to
  52. // distribute KCP processing load across CPU cores.
  53. pconns []*turbotunnel.QueuePacketConn
  54. // clientIDLookupKey is a secret key used to tweak the hash-based
  55. // assignment of ClientID to pconn, in order to avoid manipulation of
  56. // hash assignments.
  57. clientIDLookupKey []byte
  58. }
  59. // newHTTPHandler creates a new http.Handler that exchanges encapsulated packets
  60. // over incoming WebSocket connections.
  61. func newHTTPHandler(localAddr net.Addr, numInstances int, mtu int) *httpHandler {
  62. pconns := make([]*turbotunnel.QueuePacketConn, 0, numInstances)
  63. for i := 0; i < numInstances; i++ {
  64. pconns = append(pconns, turbotunnel.NewQueuePacketConn(localAddr, clientMapTimeout, mtu))
  65. }
  66. clientIDLookupKey := make([]byte, 16)
  67. _, err := rand.Read(clientIDLookupKey)
  68. if err != nil {
  69. panic(err)
  70. }
  71. return &httpHandler{
  72. pconns: pconns,
  73. clientIDLookupKey: clientIDLookupKey,
  74. }
  75. }
  76. // lookupPacketConn returns the element of pconns that corresponds to client ID,
  77. // according to the hash-based mapping.
  78. func (handler *httpHandler) lookupPacketConn(clientID turbotunnel.ClientID) *turbotunnel.QueuePacketConn {
  79. s := hmac.New(sha256.New, handler.clientIDLookupKey).Sum(clientID[:])
  80. return handler.pconns[binary.LittleEndian.Uint64(s)%uint64(len(handler.pconns))]
  81. }
  82. func (handler *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  83. ws, err := upgrader.Upgrade(w, r, nil)
  84. if err != nil {
  85. log.Println(err)
  86. return
  87. }
  88. conn := websocketconn.New(ws)
  89. defer conn.Close()
  90. // Pass the address of client as the remote address of incoming connection
  91. clientIPParam := r.URL.Query().Get("client_ip")
  92. addr := clientAddr(clientIPParam)
  93. var token [len(turbotunnel.Token)]byte
  94. _, err = io.ReadFull(conn, token[:])
  95. if err != nil {
  96. // Don't bother logging EOF: that happens with an unused
  97. // connection, which clients make frequently as they maintain a
  98. // pool of proxies.
  99. if err != io.EOF {
  100. log.Printf("reading token: %v", err)
  101. }
  102. return
  103. }
  104. switch {
  105. case bytes.Equal(token[:], turbotunnel.Token[:]):
  106. err = handler.turbotunnelMode(conn, addr)
  107. default:
  108. // We didn't find a matching token, which means that we are
  109. // dealing with a client that doesn't know about such things.
  110. // Close the conn as we no longer support the old
  111. // one-session-per-WebSocket mode.
  112. log.Println("Received unsupported oneshot connection")
  113. return
  114. }
  115. if err != nil {
  116. log.Println(err)
  117. return
  118. }
  119. }
  120. // turbotunnelMode handles clients that sent turbotunnel.Token at the start of
  121. // their stream. These clients expect to send and receive encapsulated packets,
  122. // with a long-lived session identified by ClientID.
  123. func (handler *httpHandler) turbotunnelMode(conn net.Conn, addr net.Addr) error {
  124. // Read the ClientID prefix. Every packet encapsulated in this WebSocket
  125. // connection pertains to the same ClientID.
  126. var clientID turbotunnel.ClientID
  127. _, err := io.ReadFull(conn, clientID[:])
  128. if err != nil {
  129. return fmt.Errorf("reading ClientID: %w", err)
  130. }
  131. // Store a short-term mapping from the ClientID to the client IP
  132. // address attached to this WebSocket connection. tor will want us to
  133. // provide a client IP address when we call pt.DialOr. But a KCP session
  134. // does not necessarily correspond to any single IP address--it's
  135. // composed of packets that are carried in possibly multiple WebSocket
  136. // streams. We apply the heuristic that the IP address of the most
  137. // recent WebSocket connection that has had to do with a session, at the
  138. // time the session is established, is the IP address that should be
  139. // credited for the entire KCP session.
  140. clientIDAddrMap.Set(clientID, addr)
  141. pconn := handler.lookupPacketConn(clientID)
  142. var wg sync.WaitGroup
  143. wg.Add(2)
  144. done := make(chan struct{})
  145. // The remainder of the WebSocket stream consists of encapsulated
  146. // packets. We read them one by one and feed them into the
  147. // QueuePacketConn on which kcp.ServeConn was set up, which eventually
  148. // leads to KCP-level sessions in the acceptSessions function.
  149. go func() {
  150. defer wg.Done()
  151. defer close(done) // Signal the write loop to finish
  152. var p [2048]byte
  153. for {
  154. n, err := encapsulation.ReadData(conn, p[:])
  155. if err == io.ErrShortBuffer {
  156. err = nil
  157. }
  158. if err != nil {
  159. return
  160. }
  161. pconn.QueueIncoming(p[:n], clientID)
  162. }
  163. }()
  164. // At the same time, grab packets addressed to this ClientID and
  165. // encapsulate them into the downstream.
  166. go func() {
  167. defer wg.Done()
  168. defer conn.Close() // Signal the read loop to finish
  169. // Buffer encapsulation.WriteData operations to keep length
  170. // prefixes in the same send as the data that follows.
  171. bw := bufio.NewWriter(conn)
  172. for {
  173. select {
  174. case <-done:
  175. return
  176. case p, ok := <-pconn.OutgoingQueue(clientID):
  177. if !ok {
  178. return
  179. }
  180. _, err := encapsulation.WriteData(bw, p)
  181. pconn.Restore(p)
  182. if err == nil {
  183. err = bw.Flush()
  184. }
  185. if err != nil {
  186. return
  187. }
  188. }
  189. }
  190. }()
  191. wg.Wait()
  192. return nil
  193. }
  194. // ClientMapAddr is a string that represents a connecting client.
  195. type ClientMapAddr string
  196. func (addr ClientMapAddr) Network() string {
  197. return "snowflake"
  198. }
  199. func (addr ClientMapAddr) String() string {
  200. return string(addr)
  201. }
  202. // Return a client address
  203. func clientAddr(clientIPParam string) net.Addr {
  204. if clientIPParam == "" {
  205. return ClientMapAddr("")
  206. }
  207. // Check if client addr is a valid IP
  208. clientIP := net.ParseIP(clientIPParam)
  209. if clientIP == nil {
  210. return ClientMapAddr("")
  211. }
  212. // Check if client addr is 0.0.0.0 or [::]. Some proxies erroneously
  213. // report an address of 0.0.0.0: https://bugs.torproject.org/33157.
  214. if clientIP.IsUnspecified() {
  215. return ClientMapAddr("")
  216. }
  217. // Add a stub port number. USERADDR requires a port number.
  218. return ClientMapAddr((&net.TCPAddr{IP: clientIP, Port: 1, Zone: ""}).String())
  219. }