123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- package lib
- import (
- "bytes"
- "errors"
- "io"
- "log"
- "sync"
- "time"
- "github.com/dchest/uniuri"
- "github.com/pion/webrtc"
- )
- // Remote WebRTC peer.
- // Implements the |Snowflake| interface, which includes
- // |io.ReadWriter|, |Resetter|, and |Connector|.
- //
- // Handles preparation of go-webrtc PeerConnection. Only ever has
- // one DataChannel.
- type WebRTCPeer struct {
- id string
- config *webrtc.Configuration
- pc *webrtc.PeerConnection
- transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
- broker *BrokerChannel
- offerChannel chan *webrtc.SessionDescription
- answerChannel chan *webrtc.SessionDescription
- errorChannel chan error
- recvPipe *io.PipeReader
- writePipe *io.PipeWriter
- lastReceive time.Time
- buffer bytes.Buffer
- reset chan struct{}
- closed bool
- lock sync.Mutex // Synchronization for DataChannel destruction
- once sync.Once // Synchronization for PeerConnection destruction
- BytesLogger
- }
- // Construct a WebRTC PeerConnection.
- func NewWebRTCPeer(config *webrtc.Configuration,
- broker *BrokerChannel) *WebRTCPeer {
- connection := new(WebRTCPeer)
- connection.id = "snowflake-" + uniuri.New()
- connection.config = config
- connection.broker = broker
- connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
- connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
- // Error channel is mostly for reporting during the initial SDP offer
- // creation & local description setting, which happens asynchronously.
- connection.errorChannel = make(chan error, 1)
- connection.reset = make(chan struct{}, 1)
- // Override with something that's not NullLogger to have real logging.
- connection.BytesLogger = &BytesNullLogger{}
- // Pipes remain the same even when DataChannel gets switched.
- connection.recvPipe, connection.writePipe = io.Pipe()
- return connection
- }
- // Read bytes from local SOCKS.
- // As part of |io.ReadWriter|
- func (c *WebRTCPeer) Read(b []byte) (int, error) {
- return c.recvPipe.Read(b)
- }
- // Writes bytes out to remote WebRTC.
- // As part of |io.ReadWriter|
- func (c *WebRTCPeer) Write(b []byte) (int, error) {
- c.lock.Lock()
- defer c.lock.Unlock()
- c.BytesLogger.AddOutbound(len(b))
- // TODO: Buffering could be improved / separated out of WebRTCPeer.
- if nil == c.transport {
- log.Printf("Buffered %d bytes --> WebRTC", len(b))
- c.buffer.Write(b)
- } else {
- c.transport.Send(b)
- }
- return len(b), nil
- }
- // As part of |Snowflake|
- func (c *WebRTCPeer) Close() error {
- c.once.Do(func() {
- c.closed = true
- c.cleanup()
- c.Reset()
- log.Printf("WebRTC: Closing")
- })
- return nil
- }
- // As part of |Resetter|
- func (c *WebRTCPeer) Reset() {
- if nil == c.reset {
- return
- }
- c.reset <- struct{}{}
- }
- // As part of |Resetter|
- func (c *WebRTCPeer) WaitForReset() { <-c.reset }
- // Prevent long-lived broken remotes.
- // Should also update the DataChannel in underlying go-webrtc's to make Closes
- // more immediate / responsive.
- func (c *WebRTCPeer) checkForStaleness() {
- c.lastReceive = time.Now()
- for {
- if c.closed {
- return
- }
- if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
- log.Println("WebRTC: No messages received for", SnowflakeTimeout,
- "seconds -- closing stale connection.")
- c.Close()
- return
- }
- <-time.After(time.Second)
- }
- }
- // As part of |Connector| interface.
- func (c *WebRTCPeer) Connect() error {
- log.Println(c.id, " connecting...")
- // TODO: When go-webrtc is more stable, it's possible that a new
- // PeerConnection won't need to be re-prepared each time.
- err := c.preparePeerConnection()
- if err != nil {
- return err
- }
- err = c.establishDataChannel()
- if err != nil {
- // nolint: golint
- return errors.New("WebRTC: Could not establish DataChannel")
- }
- err = c.exchangeSDP()
- if err != nil {
- return err
- }
- go c.checkForStaleness()
- return nil
- }
- // Create and prepare callbacks on a new WebRTC PeerConnection.
- func (c *WebRTCPeer) preparePeerConnection() error {
- if nil != c.pc {
- if err := c.pc.Close(); err != nil {
- log.Printf("c.pc.Close returned error: %v", err)
- }
- c.pc = nil
- }
- s := webrtc.SettingEngine{}
- s.SetTrickle(true)
- api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
- pc, err := api.NewPeerConnection(*c.config)
- if err != nil {
- log.Printf("NewPeerConnection ERROR: %s", err)
- return err
- }
- // Prepare PeerConnection callbacks.
- // Allow candidates to accumulate until ICEGatheringStateComplete.
- pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
- if candidate == nil {
- log.Printf("WebRTC: Done gathering candidates")
- } else {
- log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
- }
- })
- pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
- if state == webrtc.ICEGathererStateComplete {
- log.Println("WebRTC: ICEGatheringStateComplete")
- c.offerChannel <- pc.LocalDescription()
- }
- })
- // This callback is not expected, as the Client initiates the creation
- // of the data channel, not the remote peer.
- pc.OnDataChannel(func(channel *webrtc.DataChannel) {
- log.Println("OnDataChannel")
- panic("Unexpected OnDataChannel!")
- })
- c.pc = pc
- go func() {
- offer, err := pc.CreateOffer(nil)
- // TODO: Potentially timeout and retry if ICE isn't working.
- if err != nil {
- c.errorChannel <- err
- return
- }
- log.Println("WebRTC: Created offer")
- err = pc.SetLocalDescription(offer)
- if err != nil {
- c.errorChannel <- err
- return
- }
- log.Println("WebRTC: Set local description")
- }()
- log.Println("WebRTC: PeerConnection created.")
- return nil
- }
- // Create a WebRTC DataChannel locally.
- func (c *WebRTCPeer) establishDataChannel() error {
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.transport != nil {
- panic("Unexpected datachannel already exists!")
- }
- ordered := true
- dataChannelOptions := &webrtc.DataChannelInit{
- Ordered: &ordered,
- }
- dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
- // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
- // an SDP offer while other goroutines operating on this struct handle the
- // signaling. Eventually fires "OnOpen".
- if err != nil {
- log.Printf("CreateDataChannel ERROR: %s", err)
- return err
- }
- dc.OnOpen(func() {
- c.lock.Lock()
- defer c.lock.Unlock()
- log.Println("WebRTC: DataChannel.OnOpen")
- if nil != c.transport {
- panic("WebRTC: transport already exists.")
- }
- // Flush buffered outgoing SOCKS data if necessary.
- if c.buffer.Len() > 0 {
- dc.Send(c.buffer.Bytes())
- log.Println("Flushed", c.buffer.Len(), "bytes.")
- c.buffer.Reset()
- }
- // Then enable the datachannel.
- c.transport = dc
- })
- dc.OnClose(func() {
- c.lock.Lock()
- // Future writes will go to the buffer until a new DataChannel is available.
- if nil == c.transport {
- // Closed locally, as part of a reset.
- log.Println("WebRTC: DataChannel.OnClose [locally]")
- c.lock.Unlock()
- return
- }
- // Closed remotely, need to reset everything.
- // Disable the DataChannel as a write destination.
- log.Println("WebRTC: DataChannel.OnClose [remotely]")
- c.transport = nil
- dc.Close()
- // Unlock before Close'ing, since it calls cleanup and asks for the
- // lock to check if the transport needs to be be deleted.
- c.lock.Unlock()
- c.Close()
- })
- dc.OnMessage(func(msg webrtc.DataChannelMessage) {
- if len(msg.Data) <= 0 {
- log.Println("0 length message---")
- }
- c.BytesLogger.AddInbound(len(msg.Data))
- n, err := c.writePipe.Write(msg.Data)
- if err != nil {
- // TODO: Maybe shouldn't actually close.
- log.Println("Error writing to SOCKS pipe")
- if inerr := c.writePipe.CloseWithError(err); inerr != nil {
- log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
- }
- }
- if n != len(msg.Data) {
- log.Println("Error: short write")
- panic("short write")
- }
- c.lastReceive = time.Now()
- })
- log.Println("WebRTC: DataChannel created.")
- return nil
- }
- func (c *WebRTCPeer) sendOfferToBroker() {
- if nil == c.broker {
- return
- }
- offer := c.pc.LocalDescription()
- answer, err := c.broker.Negotiate(offer)
- if nil != err || nil == answer {
- log.Printf("BrokerChannel Error: %s", err)
- answer = nil
- }
- c.answerChannel <- answer
- }
- // Block until an SDP offer is available, send it to either
- // the Broker or signal pipe, then await for the SDP answer.
- func (c *WebRTCPeer) exchangeSDP() error {
- select {
- case <-c.offerChannel:
- case err := <-c.errorChannel:
- log.Println("Failed to prepare offer", err)
- c.Close()
- return err
- }
- // Keep trying the same offer until a valid answer arrives.
- var ok bool
- var answer *webrtc.SessionDescription
- for nil == answer {
- go c.sendOfferToBroker()
- answer, ok = <-c.answerChannel // Blocks...
- if !ok || nil == answer {
- log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
- <-time.After(time.Second * ReconnectTimeout)
- answer = nil
- }
- }
- log.Printf("Received Answer.\n")
- err := c.pc.SetRemoteDescription(*answer)
- if nil != err {
- log.Println("WebRTC: Unable to SetRemoteDescription:", err)
- return err
- }
- return nil
- }
- // Close all channels and transports
- func (c *WebRTCPeer) cleanup() {
- if nil != c.offerChannel {
- close(c.offerChannel)
- }
- if nil != c.answerChannel {
- close(c.answerChannel)
- }
- if nil != c.errorChannel {
- close(c.errorChannel)
- }
- // Close this side of the SOCKS pipe.
- if nil != c.writePipe {
- c.writePipe.Close()
- c.writePipe = nil
- }
- c.lock.Lock()
- if nil != c.transport {
- log.Printf("WebRTC: closing DataChannel")
- dataChannel := c.transport
- // Setting transport to nil *before* dc Close indicates to OnClose that
- // this was locally triggered.
- c.transport = nil
- // Release the lock before calling DeleteDataChannel (which in turn
- // calls Close on the dataChannel), but after nil'ing out the transport,
- // since otherwise we'll end up in the onClose handler in a deadlock.
- c.lock.Unlock()
- if c.pc == nil {
- panic("DataChannel w/o PeerConnection, not good.")
- }
- dataChannel.(*webrtc.DataChannel).Close()
- } else {
- c.lock.Unlock()
- }
- if nil != c.pc {
- log.Printf("WebRTC: closing PeerConnection")
- err := c.pc.Close()
- if nil != err {
- log.Printf("Error closing peerconnection...")
- }
- c.pc = nil
- }
- }
|