snowflake.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package lib
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "math/rand"
  7. "net"
  8. "strings"
  9. "time"
  10. "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
  11. "git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
  12. "github.com/pion/webrtc/v3"
  13. "github.com/xtaci/kcp-go/v5"
  14. "github.com/xtaci/smux"
  15. )
  16. const (
  17. ReconnectTimeout = 10 * time.Second
  18. SnowflakeTimeout = 20 * time.Second
  19. // How long to wait for the OnOpen callback on a DataChannel.
  20. DataChannelTimeout = 10 * time.Second
  21. )
  22. type dummyAddr struct{}
  23. func (addr dummyAddr) Network() string { return "dummy" }
  24. func (addr dummyAddr) String() string { return "dummy" }
  25. // Transport is a structure with methods that conform to the Go PT v2.1 API
  26. // https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
  27. type Transport struct {
  28. dialer *WebRTCDialer
  29. }
  30. // Create a new Snowflake transport client that can spawn multiple Snowflake connections.
  31. // brokerURL and frontDomain are the urls for the broker host and domain fronting host
  32. // iceAddresses are the STUN/TURN urls needed for WebRTC negotiation
  33. // keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes)
  34. // max is the maximum number of snowflakes the client should gather for each SOCKS connection
  35. func NewSnowflakeClient(brokerURL, frontDomain string, iceAddresses []string, keepLocalAddresses bool, max int) (*Transport, error) {
  36. log.Println("\n\n\n --- Starting Snowflake Client ---")
  37. iceServers := parseIceServers(iceAddresses)
  38. // chooses a random subset of servers from inputs
  39. rand.Seed(time.Now().UnixNano())
  40. rand.Shuffle(len(iceServers), func(i, j int) {
  41. iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
  42. })
  43. if len(iceServers) > 2 {
  44. iceServers = iceServers[:(len(iceServers)+1)/2]
  45. }
  46. log.Printf("Using ICE servers:")
  47. for _, server := range iceServers {
  48. log.Printf("url: %v", strings.Join(server.URLs, " "))
  49. }
  50. // Use potentially domain-fronting broker to rendezvous.
  51. broker, err := NewBrokerChannel(
  52. brokerURL, frontDomain, CreateBrokerTransport(),
  53. keepLocalAddresses)
  54. if err != nil {
  55. return nil, err
  56. }
  57. go updateNATType(iceServers, broker)
  58. transport := &Transport{dialer: NewWebRTCDialer(broker, iceServers, max)}
  59. return transport, nil
  60. }
  61. // Create a new Snowflake connection. Starts the collection of snowflakes and returns a
  62. // smux Stream.
  63. func (t *Transport) Dial() (net.Conn, error) {
  64. // Cleanup functions to run before returning, in case of an error.
  65. var cleanup []func()
  66. defer func() {
  67. // Run cleanup in reverse order, as defer does.
  68. for i := len(cleanup) - 1; i >= 0; i-- {
  69. cleanup[i]()
  70. }
  71. }()
  72. // Prepare to collect remote WebRTC peers.
  73. snowflakes, err := NewPeers(t.dialer)
  74. if err != nil {
  75. return nil, err
  76. }
  77. cleanup = append(cleanup, func() { snowflakes.End() })
  78. // Use a real logger to periodically output how much traffic is happening.
  79. snowflakes.BytesLogger = NewBytesSyncLogger()
  80. log.Printf("---- SnowflakeConn: begin collecting snowflakes ---")
  81. go connectLoop(snowflakes)
  82. // Create a new smux session
  83. log.Printf("---- SnowflakeConn: starting a new session ---")
  84. pconn, sess, err := newSession(snowflakes)
  85. if err != nil {
  86. return nil, err
  87. }
  88. cleanup = append(cleanup, func() {
  89. pconn.Close()
  90. sess.Close()
  91. })
  92. // On the smux session we overlay a stream.
  93. stream, err := sess.OpenStream()
  94. if err != nil {
  95. return nil, err
  96. }
  97. // Begin exchanging data.
  98. log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
  99. cleanup = append(cleanup, func() { stream.Close() })
  100. // All good, clear the cleanup list.
  101. cleanup = nil
  102. return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
  103. }
  104. type SnowflakeConn struct {
  105. *smux.Stream
  106. sess *smux.Session
  107. pconn net.PacketConn
  108. snowflakes *Peers
  109. }
  110. func (conn *SnowflakeConn) Close() error {
  111. log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
  112. conn.Stream.Close()
  113. log.Printf("---- SnowflakeConn: end collecting snowflakes ---")
  114. conn.snowflakes.End()
  115. conn.pconn.Close()
  116. log.Printf("---- SnowflakeConn: discarding finished session ---")
  117. conn.sess.Close()
  118. return nil //TODO: return errors if any of the above do
  119. }
  120. // loop through all provided STUN servers until we exhaust the list or find
  121. // one that is compatable with RFC 5780
  122. func updateNATType(servers []webrtc.ICEServer, broker *BrokerChannel) {
  123. var restrictedNAT bool
  124. var err error
  125. for _, server := range servers {
  126. addr := strings.TrimPrefix(server.URLs[0], "stun:")
  127. restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
  128. if err == nil {
  129. if restrictedNAT {
  130. broker.SetNATType(nat.NATRestricted)
  131. } else {
  132. broker.SetNATType(nat.NATUnrestricted)
  133. }
  134. break
  135. }
  136. }
  137. if err != nil {
  138. broker.SetNATType(nat.NATUnknown)
  139. }
  140. }
  141. // Returns a slice of webrtc.ICEServer given a slice of addresses
  142. func parseIceServers(addresses []string) []webrtc.ICEServer {
  143. var servers []webrtc.ICEServer
  144. if len(addresses) == 0 {
  145. return nil
  146. }
  147. for _, url := range addresses {
  148. url = strings.TrimSpace(url)
  149. servers = append(servers, webrtc.ICEServer{
  150. URLs: []string{url},
  151. })
  152. }
  153. return servers
  154. }
  155. // newSession returns a new smux.Session and the net.PacketConn it is running
  156. // over. The net.PacketConn successively connects through Snowflake proxies
  157. // pulled from snowflakes.
  158. func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
  159. clientID := turbotunnel.NewClientID()
  160. // We build a persistent KCP session on a sequence of ephemeral WebRTC
  161. // connections. This dialContext tells RedialPacketConn how to get a new
  162. // WebRTC connection when the previous one dies. Inside each WebRTC
  163. // connection, we use EncapsulationPacketConn to encode packets into a
  164. // stream.
  165. dialContext := func(ctx context.Context) (net.PacketConn, error) {
  166. log.Printf("redialing on same connection")
  167. // Obtain an available WebRTC remote. May block.
  168. conn := snowflakes.Pop()
  169. if conn == nil {
  170. return nil, errors.New("handler: Received invalid Snowflake")
  171. }
  172. log.Println("---- Handler: snowflake assigned ----")
  173. // Send the magic Turbo Tunnel token.
  174. _, err := conn.Write(turbotunnel.Token[:])
  175. if err != nil {
  176. return nil, err
  177. }
  178. // Send ClientID prefix.
  179. _, err = conn.Write(clientID[:])
  180. if err != nil {
  181. return nil, err
  182. }
  183. return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
  184. }
  185. pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
  186. // conn is built on the underlying RedialPacketConn—when one WebRTC
  187. // connection dies, another one will be found to take its place. The
  188. // sequence of packets across multiple WebRTC connections drives the KCP
  189. // engine.
  190. conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
  191. if err != nil {
  192. pconn.Close()
  193. return nil, nil, err
  194. }
  195. // Permit coalescing the payloads of consecutive sends.
  196. conn.SetStreamMode(true)
  197. // Set the maximum send and receive window sizes to a high number
  198. // Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026
  199. conn.SetWindowSize(65535, 65535)
  200. // Disable the dynamic congestion window (limit only by the
  201. // maximum of local and remote static windows).
  202. conn.SetNoDelay(
  203. 0, // default nodelay
  204. 0, // default interval
  205. 0, // default resend
  206. 1, // nc=1 => congestion window off
  207. )
  208. // On the KCP connection we overlay an smux session and stream.
  209. smuxConfig := smux.DefaultConfig()
  210. smuxConfig.Version = 2
  211. smuxConfig.KeepAliveTimeout = 10 * time.Minute
  212. sess, err := smux.Client(conn, smuxConfig)
  213. if err != nil {
  214. conn.Close()
  215. pconn.Close()
  216. return nil, nil, err
  217. }
  218. return pconn, sess, err
  219. }
  220. // Maintain |SnowflakeCapacity| number of available WebRTC connections, to
  221. // transfer to the Tor SOCKS handler when needed.
  222. func connectLoop(snowflakes SnowflakeCollector) {
  223. for {
  224. timer := time.After(ReconnectTimeout)
  225. _, err := snowflakes.Collect()
  226. if err != nil {
  227. log.Printf("WebRTC: %v Retrying...", err)
  228. }
  229. select {
  230. case <-timer:
  231. continue
  232. case <-snowflakes.Melted():
  233. log.Println("ConnectLoop: stopped.")
  234. return
  235. }
  236. }
  237. }