webrtc.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. package snowflake_client
  2. import (
  3. "crypto/rand"
  4. "encoding/hex"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "log"
  9. "net/url"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/pion/ice/v2"
  14. "github.com/pion/transport/v2"
  15. "github.com/pion/transport/v2/stdnet"
  16. "github.com/pion/webrtc/v3"
  17. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/event"
  18. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/v2/common/proxy"
  19. )
  20. // WebRTCPeer represents a WebRTC connection to a remote snowflake proxy.
  21. //
  22. // Each WebRTCPeer only ever has one DataChannel that is used as the peer's transport.
  23. type WebRTCPeer struct {
  24. id string
  25. pc *webrtc.PeerConnection
  26. transport *webrtc.DataChannel
  27. recvPipe *io.PipeReader
  28. writePipe *io.PipeWriter
  29. mu sync.Mutex // protects the following:
  30. lastReceive time.Time
  31. open chan struct{} // Channel to notify when datachannel opens
  32. closed chan struct{}
  33. once sync.Once // Synchronization for PeerConnection destruction
  34. bytesLogger bytesLogger
  35. eventsLogger event.SnowflakeEventReceiver
  36. proxy *url.URL
  37. }
  38. // Deprecated: Use NewWebRTCPeerWithEventsAndProxy Instead.
  39. func NewWebRTCPeer(
  40. config *webrtc.Configuration, broker *BrokerChannel,
  41. ) (*WebRTCPeer, error) {
  42. return NewWebRTCPeerWithEventsAndProxy(config, broker, nil, nil)
  43. }
  44. // Deprecated: Use NewWebRTCPeerWithEventsAndProxy Instead.
  45. func NewWebRTCPeerWithEvents(
  46. config *webrtc.Configuration, broker *BrokerChannel,
  47. eventsLogger event.SnowflakeEventReceiver,
  48. ) (*WebRTCPeer, error) {
  49. return NewWebRTCPeerWithEventsAndProxy(config, broker, eventsLogger, nil)
  50. }
  51. // NewWebRTCPeerWithEventsAndProxy constructs a WebRTC PeerConnection to a snowflake proxy.
  52. //
  53. // The creation of the peer handles the signaling to the Snowflake broker, including
  54. // the exchange of SDP information, the creation of a PeerConnection, and the establishment
  55. // of a DataChannel to the Snowflake proxy.
  56. func NewWebRTCPeerWithEventsAndProxy(
  57. config *webrtc.Configuration, broker *BrokerChannel,
  58. eventsLogger event.SnowflakeEventReceiver, proxy *url.URL,
  59. ) (*WebRTCPeer, error) {
  60. if eventsLogger == nil {
  61. eventsLogger = event.NewSnowflakeEventDispatcher()
  62. }
  63. connection := new(WebRTCPeer)
  64. {
  65. var buf [8]byte
  66. if _, err := rand.Read(buf[:]); err != nil {
  67. panic(err)
  68. }
  69. connection.id = "snowflake-" + hex.EncodeToString(buf[:])
  70. }
  71. connection.closed = make(chan struct{})
  72. // Override with something that's not NullLogger to have real logging.
  73. connection.bytesLogger = &bytesNullLogger{}
  74. // Pipes remain the same even when DataChannel gets switched.
  75. connection.recvPipe, connection.writePipe = io.Pipe()
  76. connection.eventsLogger = eventsLogger
  77. connection.proxy = proxy
  78. err := connection.connect(config, broker)
  79. if err != nil {
  80. connection.Close()
  81. return nil, err
  82. }
  83. return connection, nil
  84. }
  85. // Read bytes from local SOCKS.
  86. // As part of |io.ReadWriter|
  87. func (c *WebRTCPeer) Read(b []byte) (int, error) {
  88. return c.recvPipe.Read(b)
  89. }
  90. // Writes bytes out to remote WebRTC.
  91. // As part of |io.ReadWriter|
  92. func (c *WebRTCPeer) Write(b []byte) (int, error) {
  93. err := c.transport.Send(b)
  94. if err != nil {
  95. return 0, err
  96. }
  97. c.bytesLogger.addOutbound(int64(len(b)))
  98. return len(b), nil
  99. }
  100. // Closed returns a boolean indicated whether the peer is closed.
  101. func (c *WebRTCPeer) Closed() bool {
  102. select {
  103. case <-c.closed:
  104. return true
  105. default:
  106. }
  107. return false
  108. }
  109. // Close closes the connection the snowflake proxy.
  110. func (c *WebRTCPeer) Close() error {
  111. c.once.Do(func() {
  112. close(c.closed)
  113. c.cleanup()
  114. log.Printf("WebRTC: Closing")
  115. })
  116. return nil
  117. }
  118. // Prevent long-lived broken remotes.
  119. // Should also update the DataChannel in underlying go-webrtc's to make Closes
  120. // more immediate / responsive.
  121. func (c *WebRTCPeer) checkForStaleness(timeout time.Duration) {
  122. c.mu.Lock()
  123. c.lastReceive = time.Now()
  124. c.mu.Unlock()
  125. for {
  126. c.mu.Lock()
  127. lastReceive := c.lastReceive
  128. c.mu.Unlock()
  129. if time.Since(lastReceive) > timeout {
  130. log.Printf("WebRTC: No messages received for %v -- closing stale connection.",
  131. timeout)
  132. err := errors.New("no messages received, closing stale connection")
  133. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnSnowflakeConnectionFailed{Error: err})
  134. c.Close()
  135. return
  136. }
  137. select {
  138. case <-c.closed:
  139. return
  140. case <-time.After(time.Second):
  141. }
  142. }
  143. }
  144. // connect does the bulk of the work: gather ICE candidates, send the SDP offer to broker,
  145. // receive an answer from broker, and wait for data channel to open
  146. func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel) error {
  147. log.Println(c.id, " connecting...")
  148. err := c.preparePeerConnection(config)
  149. localDescription := c.pc.LocalDescription()
  150. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnOfferCreated{
  151. WebRTCLocalDescription: localDescription,
  152. Error: err,
  153. })
  154. if err != nil {
  155. return err
  156. }
  157. answer, err := broker.Negotiate(localDescription)
  158. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnBrokerRendezvous{
  159. WebRTCRemoteDescription: answer,
  160. Error: err,
  161. })
  162. if err != nil {
  163. return err
  164. }
  165. log.Printf("Received Answer.\n")
  166. err = c.pc.SetRemoteDescription(*answer)
  167. if nil != err {
  168. log.Println("WebRTC: Unable to SetRemoteDescription:", err)
  169. return err
  170. }
  171. // Wait for the datachannel to open or time out
  172. select {
  173. case <-c.open:
  174. case <-time.After(DataChannelTimeout):
  175. c.transport.Close()
  176. err = errors.New("timeout waiting for DataChannel.OnOpen")
  177. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnSnowflakeConnectionFailed{Error: err})
  178. return err
  179. }
  180. go c.checkForStaleness(SnowflakeTimeout)
  181. return nil
  182. }
  183. // preparePeerConnection creates a new WebRTC PeerConnection and returns it
  184. // after non-trickle ICE candidate gathering is complete.
  185. func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error {
  186. var err error
  187. s := webrtc.SettingEngine{}
  188. s.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
  189. // Use the SetNet setting https://pkg.go.dev/github.com/pion/webrtc/v3#SettingEngine.SetNet
  190. // to get snowflake working in shadow (where the AF_NETLINK family is not implemented).
  191. // These two lines of code functionally revert a new change in pion by silently ignoring
  192. // when net.Interfaces() fails, rather than throwing an error
  193. var vnet transport.Net
  194. vnet, _ = stdnet.NewNet()
  195. if c.proxy != nil {
  196. if err = proxy.CheckProxyProtocolSupport(c.proxy); err != nil {
  197. return err
  198. }
  199. socksClient := proxy.NewSocks5UDPClient(c.proxy)
  200. vnet = proxy.NewTransportWrapper(&socksClient, vnet)
  201. }
  202. s.SetNet(vnet)
  203. api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
  204. c.pc, err = api.NewPeerConnection(*config)
  205. if err != nil {
  206. log.Printf("NewPeerConnection ERROR: %s", err)
  207. return err
  208. }
  209. ordered := true
  210. dataChannelOptions := &webrtc.DataChannelInit{
  211. Ordered: &ordered,
  212. }
  213. // We must create the data channel before creating an offer
  214. // https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0#a-data-channel-is-no-longer-implicitly-created-with-a-peerconnection
  215. dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
  216. if err != nil {
  217. log.Printf("CreateDataChannel ERROR: %s", err)
  218. return err
  219. }
  220. dc.OnOpen(func() {
  221. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnSnowflakeConnected{})
  222. log.Println("WebRTC: DataChannel.OnOpen")
  223. close(c.open)
  224. })
  225. dc.OnClose(func() {
  226. log.Println("WebRTC: DataChannel.OnClose")
  227. c.Close()
  228. })
  229. dc.OnError(func(err error) {
  230. c.eventsLogger.OnNewSnowflakeEvent(event.EventOnSnowflakeConnectionFailed{Error: err})
  231. })
  232. dc.OnMessage(func(msg webrtc.DataChannelMessage) {
  233. if len(msg.Data) <= 0 {
  234. log.Println("0 length message---")
  235. }
  236. n, err := c.writePipe.Write(msg.Data)
  237. c.bytesLogger.addInbound(int64(n))
  238. if err != nil {
  239. // TODO: Maybe shouldn't actually close.
  240. log.Println("Error writing to SOCKS pipe")
  241. if inerr := c.writePipe.CloseWithError(err); inerr != nil {
  242. log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
  243. }
  244. }
  245. c.mu.Lock()
  246. c.lastReceive = time.Now()
  247. c.mu.Unlock()
  248. })
  249. c.transport = dc
  250. c.open = make(chan struct{})
  251. log.Println("WebRTC: DataChannel created")
  252. offer, err := c.pc.CreateOffer(nil)
  253. // TODO: Potentially timeout and retry if ICE isn't working.
  254. if err != nil {
  255. log.Println("Failed to prepare offer", err)
  256. c.pc.Close()
  257. return err
  258. }
  259. log.Println("WebRTC: Created offer")
  260. // Allow candidates to accumulate until ICEGatheringStateComplete.
  261. done := webrtc.GatheringCompletePromise(c.pc)
  262. // Start gathering candidates
  263. err = c.pc.SetLocalDescription(offer)
  264. if err != nil {
  265. log.Println("Failed to apply offer", err)
  266. c.pc.Close()
  267. return err
  268. }
  269. log.Println("WebRTC: Set local description")
  270. <-done // Wait for ICE candidate gathering to complete.
  271. if !strings.Contains(c.pc.LocalDescription().SDP, "\na=candidate:") {
  272. return fmt.Errorf("SDP offer contains no candidate")
  273. }
  274. return nil
  275. }
  276. // cleanup closes all channels and transports
  277. func (c *WebRTCPeer) cleanup() {
  278. // Close this side of the SOCKS pipe.
  279. if c.writePipe != nil { // c.writePipe can be nil in tests.
  280. c.writePipe.Close()
  281. }
  282. if nil != c.transport {
  283. log.Printf("WebRTC: closing DataChannel")
  284. c.transport.Close()
  285. }
  286. if nil != c.pc {
  287. log.Printf("WebRTC: closing PeerConnection")
  288. err := c.pc.Close()
  289. if nil != err {
  290. log.Printf("Error closing peerconnection...")
  291. }
  292. }
  293. }