webrtc.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. package lib
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "log"
  7. "sync"
  8. "time"
  9. "github.com/dchest/uniuri"
  10. "github.com/pion/webrtc"
  11. )
  12. // Remote WebRTC peer.
  13. // Implements the |Snowflake| interface, which includes
  14. // |io.ReadWriter|, |Resetter|, and |Connector|.
  15. //
  16. // Handles preparation of go-webrtc PeerConnection. Only ever has
  17. // one DataChannel.
  18. type WebRTCPeer struct {
  19. id string
  20. config *webrtc.Configuration
  21. pc *webrtc.PeerConnection
  22. transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
  23. broker *BrokerChannel
  24. offerChannel chan *webrtc.SessionDescription
  25. answerChannel chan *webrtc.SessionDescription
  26. errorChannel chan error
  27. recvPipe *io.PipeReader
  28. writePipe *io.PipeWriter
  29. lastReceive time.Time
  30. buffer bytes.Buffer
  31. reset chan struct{}
  32. closed bool
  33. lock sync.Mutex // Synchronization for DataChannel destruction
  34. once sync.Once // Synchronization for PeerConnection destruction
  35. BytesLogger
  36. }
  37. // Construct a WebRTC PeerConnection.
  38. func NewWebRTCPeer(config *webrtc.Configuration,
  39. broker *BrokerChannel) *WebRTCPeer {
  40. connection := new(WebRTCPeer)
  41. connection.id = "snowflake-" + uniuri.New()
  42. connection.config = config
  43. connection.broker = broker
  44. connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
  45. connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
  46. // Error channel is mostly for reporting during the initial SDP offer
  47. // creation & local description setting, which happens asynchronously.
  48. connection.errorChannel = make(chan error, 1)
  49. connection.reset = make(chan struct{}, 1)
  50. // Override with something that's not NullLogger to have real logging.
  51. connection.BytesLogger = &BytesNullLogger{}
  52. // Pipes remain the same even when DataChannel gets switched.
  53. connection.recvPipe, connection.writePipe = io.Pipe()
  54. return connection
  55. }
  56. // Read bytes from local SOCKS.
  57. // As part of |io.ReadWriter|
  58. func (c *WebRTCPeer) Read(b []byte) (int, error) {
  59. return c.recvPipe.Read(b)
  60. }
  61. // Writes bytes out to remote WebRTC.
  62. // As part of |io.ReadWriter|
  63. func (c *WebRTCPeer) Write(b []byte) (int, error) {
  64. c.lock.Lock()
  65. defer c.lock.Unlock()
  66. c.BytesLogger.AddOutbound(len(b))
  67. // TODO: Buffering could be improved / separated out of WebRTCPeer.
  68. if nil == c.transport {
  69. log.Printf("Buffered %d bytes --> WebRTC", len(b))
  70. c.buffer.Write(b)
  71. } else {
  72. c.transport.Send(b)
  73. }
  74. return len(b), nil
  75. }
  76. // As part of |Snowflake|
  77. func (c *WebRTCPeer) Close() error {
  78. c.once.Do(func() {
  79. c.closed = true
  80. c.cleanup()
  81. c.Reset()
  82. log.Printf("WebRTC: Closing")
  83. })
  84. return nil
  85. }
  86. // As part of |Resetter|
  87. func (c *WebRTCPeer) Reset() {
  88. if nil == c.reset {
  89. return
  90. }
  91. c.reset <- struct{}{}
  92. }
  93. // As part of |Resetter|
  94. func (c *WebRTCPeer) WaitForReset() { <-c.reset }
  95. // Prevent long-lived broken remotes.
  96. // Should also update the DataChannel in underlying go-webrtc's to make Closes
  97. // more immediate / responsive.
  98. func (c *WebRTCPeer) checkForStaleness() {
  99. c.lastReceive = time.Now()
  100. for {
  101. if c.closed {
  102. return
  103. }
  104. if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
  105. log.Println("WebRTC: No messages received for", SnowflakeTimeout,
  106. "seconds -- closing stale connection.")
  107. c.Close()
  108. return
  109. }
  110. <-time.After(time.Second)
  111. }
  112. }
  113. // As part of |Connector| interface.
  114. func (c *WebRTCPeer) Connect() error {
  115. log.Println(c.id, " connecting...")
  116. // TODO: When go-webrtc is more stable, it's possible that a new
  117. // PeerConnection won't need to be re-prepared each time.
  118. err := c.preparePeerConnection()
  119. if err != nil {
  120. return err
  121. }
  122. err = c.establishDataChannel()
  123. if err != nil {
  124. // nolint: golint
  125. return errors.New("WebRTC: Could not establish DataChannel")
  126. }
  127. err = c.exchangeSDP()
  128. if err != nil {
  129. return err
  130. }
  131. go c.checkForStaleness()
  132. return nil
  133. }
  134. // Create and prepare callbacks on a new WebRTC PeerConnection.
  135. func (c *WebRTCPeer) preparePeerConnection() error {
  136. if nil != c.pc {
  137. if err := c.pc.Close(); err != nil {
  138. log.Printf("c.pc.Close returned error: %v", err)
  139. }
  140. c.pc = nil
  141. }
  142. s := webrtc.SettingEngine{}
  143. s.SetTrickle(true)
  144. api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
  145. pc, err := api.NewPeerConnection(*c.config)
  146. if err != nil {
  147. log.Printf("NewPeerConnection ERROR: %s", err)
  148. return err
  149. }
  150. // Prepare PeerConnection callbacks.
  151. // Allow candidates to accumulate until ICEGatheringStateComplete.
  152. pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
  153. if candidate == nil {
  154. log.Printf("WebRTC: Done gathering candidates")
  155. } else {
  156. log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
  157. }
  158. })
  159. pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
  160. if state == webrtc.ICEGathererStateComplete {
  161. log.Println("WebRTC: ICEGatheringStateComplete")
  162. c.offerChannel <- pc.LocalDescription()
  163. }
  164. })
  165. // This callback is not expected, as the Client initiates the creation
  166. // of the data channel, not the remote peer.
  167. pc.OnDataChannel(func(channel *webrtc.DataChannel) {
  168. log.Println("OnDataChannel")
  169. panic("Unexpected OnDataChannel!")
  170. })
  171. c.pc = pc
  172. go func() {
  173. offer, err := pc.CreateOffer(nil)
  174. // TODO: Potentially timeout and retry if ICE isn't working.
  175. if err != nil {
  176. c.errorChannel <- err
  177. return
  178. }
  179. log.Println("WebRTC: Created offer")
  180. err = pc.SetLocalDescription(offer)
  181. if err != nil {
  182. c.errorChannel <- err
  183. return
  184. }
  185. log.Println("WebRTC: Set local description")
  186. }()
  187. log.Println("WebRTC: PeerConnection created.")
  188. return nil
  189. }
  190. // Create a WebRTC DataChannel locally.
  191. func (c *WebRTCPeer) establishDataChannel() error {
  192. c.lock.Lock()
  193. defer c.lock.Unlock()
  194. if c.transport != nil {
  195. panic("Unexpected datachannel already exists!")
  196. }
  197. ordered := true
  198. dataChannelOptions := &webrtc.DataChannelInit{
  199. Ordered: &ordered,
  200. }
  201. dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
  202. // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
  203. // an SDP offer while other goroutines operating on this struct handle the
  204. // signaling. Eventually fires "OnOpen".
  205. if err != nil {
  206. log.Printf("CreateDataChannel ERROR: %s", err)
  207. return err
  208. }
  209. dc.OnOpen(func() {
  210. c.lock.Lock()
  211. defer c.lock.Unlock()
  212. log.Println("WebRTC: DataChannel.OnOpen")
  213. if nil != c.transport {
  214. panic("WebRTC: transport already exists.")
  215. }
  216. // Flush buffered outgoing SOCKS data if necessary.
  217. if c.buffer.Len() > 0 {
  218. dc.Send(c.buffer.Bytes())
  219. log.Println("Flushed", c.buffer.Len(), "bytes.")
  220. c.buffer.Reset()
  221. }
  222. // Then enable the datachannel.
  223. c.transport = dc
  224. })
  225. dc.OnClose(func() {
  226. c.lock.Lock()
  227. // Future writes will go to the buffer until a new DataChannel is available.
  228. if nil == c.transport {
  229. // Closed locally, as part of a reset.
  230. log.Println("WebRTC: DataChannel.OnClose [locally]")
  231. c.lock.Unlock()
  232. return
  233. }
  234. // Closed remotely, need to reset everything.
  235. // Disable the DataChannel as a write destination.
  236. log.Println("WebRTC: DataChannel.OnClose [remotely]")
  237. c.transport = nil
  238. dc.Close()
  239. // Unlock before Close'ing, since it calls cleanup and asks for the
  240. // lock to check if the transport needs to be be deleted.
  241. c.lock.Unlock()
  242. c.Close()
  243. })
  244. dc.OnMessage(func(msg webrtc.DataChannelMessage) {
  245. if len(msg.Data) <= 0 {
  246. log.Println("0 length message---")
  247. }
  248. c.BytesLogger.AddInbound(len(msg.Data))
  249. n, err := c.writePipe.Write(msg.Data)
  250. if err != nil {
  251. // TODO: Maybe shouldn't actually close.
  252. log.Println("Error writing to SOCKS pipe")
  253. if inerr := c.writePipe.CloseWithError(err); inerr != nil {
  254. log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
  255. }
  256. }
  257. if n != len(msg.Data) {
  258. log.Println("Error: short write")
  259. panic("short write")
  260. }
  261. c.lastReceive = time.Now()
  262. })
  263. log.Println("WebRTC: DataChannel created.")
  264. return nil
  265. }
  266. func (c *WebRTCPeer) sendOfferToBroker() {
  267. if nil == c.broker {
  268. return
  269. }
  270. offer := c.pc.LocalDescription()
  271. answer, err := c.broker.Negotiate(offer)
  272. if nil != err || nil == answer {
  273. log.Printf("BrokerChannel Error: %s", err)
  274. answer = nil
  275. }
  276. c.answerChannel <- answer
  277. }
  278. // Block until an SDP offer is available, send it to either
  279. // the Broker or signal pipe, then await for the SDP answer.
  280. func (c *WebRTCPeer) exchangeSDP() error {
  281. select {
  282. case <-c.offerChannel:
  283. case err := <-c.errorChannel:
  284. log.Println("Failed to prepare offer", err)
  285. c.Close()
  286. return err
  287. }
  288. // Keep trying the same offer until a valid answer arrives.
  289. var ok bool
  290. var answer *webrtc.SessionDescription
  291. for nil == answer {
  292. go c.sendOfferToBroker()
  293. answer, ok = <-c.answerChannel // Blocks...
  294. if !ok || nil == answer {
  295. log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
  296. <-time.After(time.Second * ReconnectTimeout)
  297. answer = nil
  298. }
  299. }
  300. log.Printf("Received Answer.\n")
  301. err := c.pc.SetRemoteDescription(*answer)
  302. if nil != err {
  303. log.Println("WebRTC: Unable to SetRemoteDescription:", err)
  304. return err
  305. }
  306. return nil
  307. }
  308. // Close all channels and transports
  309. func (c *WebRTCPeer) cleanup() {
  310. if nil != c.offerChannel {
  311. close(c.offerChannel)
  312. }
  313. if nil != c.answerChannel {
  314. close(c.answerChannel)
  315. }
  316. if nil != c.errorChannel {
  317. close(c.errorChannel)
  318. }
  319. // Close this side of the SOCKS pipe.
  320. if nil != c.writePipe {
  321. c.writePipe.Close()
  322. c.writePipe = nil
  323. }
  324. c.lock.Lock()
  325. if nil != c.transport {
  326. log.Printf("WebRTC: closing DataChannel")
  327. dataChannel := c.transport
  328. // Setting transport to nil *before* dc Close indicates to OnClose that
  329. // this was locally triggered.
  330. c.transport = nil
  331. // Release the lock before calling DeleteDataChannel (which in turn
  332. // calls Close on the dataChannel), but after nil'ing out the transport,
  333. // since otherwise we'll end up in the onClose handler in a deadlock.
  334. c.lock.Unlock()
  335. if c.pc == nil {
  336. panic("DataChannel w/o PeerConnection, not good.")
  337. }
  338. dataChannel.(*webrtc.DataChannel).Close()
  339. } else {
  340. c.lock.Unlock()
  341. }
  342. if nil != c.pc {
  343. log.Printf("WebRTC: closing PeerConnection")
  344. err := c.pc.Close()
  345. if nil != err {
  346. log.Printf("Error closing peerconnection...")
  347. }
  348. c.pc = nil
  349. }
  350. }