123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- package main
- import (
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "git.torproject.org/pluggable-transports/goptlib.git"
- "github.com/keroserene/go-webrtc"
- )
- var ptMethodName = "snowflake"
- var ptInfo pt.ServerInfo
- var logFile *os.File
- // When a datachannel handler starts, +1 is written to this channel;
- // when it ends, -1 is written.
- var handlerChan = make(chan int)
- func copyLoop(a, b net.Conn) {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- io.Copy(b, a)
- wg.Done()
- }()
- go func() {
- io.Copy(a, b)
- wg.Done()
- }()
- wg.Wait()
- }
- type webRTCConn struct {
- dc *webrtc.DataChannel
- pc *webrtc.PeerConnection
- pr *io.PipeReader
- lock sync.Mutex // Synchronization for DataChannel destruction
- once sync.Once // Synchronization for PeerConnection destruction
- }
- func (c *webRTCConn) Read(b []byte) (int, error) {
- return c.pr.Read(b)
- }
- func (c *webRTCConn) Write(b []byte) (int, error) {
- c.lock.Lock()
- defer c.lock.Unlock()
- // log.Printf("webrtc Write %d %+q", len(b), string(b))
- log.Printf("Write %d bytes --> WebRTC", len(b))
- if c.dc != nil {
- c.dc.Send(b)
- }
- return len(b), nil
- }
- func (c *webRTCConn) Close() (err error) {
- c.once.Do(func() {
- err = c.pc.Destroy()
- })
- return
- }
- func (c *webRTCConn) LocalAddr() net.Addr {
- return nil
- }
- func (c *webRTCConn) RemoteAddr() net.Addr {
- return nil
- }
- func (c *webRTCConn) SetDeadline(t time.Time) error {
- return fmt.Errorf("SetDeadline not implemented")
- }
- func (c *webRTCConn) SetReadDeadline(t time.Time) error {
- return fmt.Errorf("SetReadDeadline not implemented")
- }
- func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
- return fmt.Errorf("SetWriteDeadline not implemented")
- }
- func datachannelHandler(conn *webRTCConn) {
- defer conn.Close()
- handlerChan <- 1
- defer func() {
- handlerChan <- -1
- }()
- or, err := pt.DialOr(&ptInfo, "", ptMethodName) // TODO: Extended OR
- if err != nil {
- log.Printf("Failed to connect to ORPort: " + err.Error())
- return
- }
- defer or.Close()
- copyLoop(conn, or)
- }
- // Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
- // candidates is complete and the answer is available in LocalDescription.
- // Installs an OnDataChannel callback that creates a webRTCConn and passes it to
- // datachannelHandler.
- func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
- pc, err := webrtc.NewPeerConnection(config)
- if err != nil {
- return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
- }
- pc.OnNegotiationNeeded = func() {
- panic("OnNegotiationNeeded")
- }
- pc.OnDataChannel = func(dc *webrtc.DataChannel) {
- log.Println("OnDataChannel")
- pr, pw := io.Pipe()
- conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
- dc.OnOpen = func() {
- log.Println("OnOpen channel")
- }
- dc.OnClose = func() {
- conn.lock.Lock()
- defer conn.lock.Unlock()
- log.Println("OnClose channel")
- conn.dc = nil
- pc.DeleteDataChannel(dc)
- pw.Close()
- }
- dc.OnMessage = func(msg []byte) {
- log.Printf("OnMessage <--- %d bytes", len(msg))
- n, err := pw.Write(msg)
- if err != nil {
- pw.CloseWithError(err)
- }
- if n != len(msg) {
- panic("short write")
- }
- }
- go datachannelHandler(conn)
- }
- err = pc.SetRemoteDescription(sdp)
- if err != nil {
- pc.Destroy()
- return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
- }
- log.Println("sdp offer successfully received.")
- log.Println("Generating answer...")
- answer, err := pc.CreateAnswer()
- if err != nil {
- pc.Destroy()
- return nil, err
- }
- if answer == nil {
- pc.Destroy()
- return nil, fmt.Errorf("Failed gathering ICE candidates.")
- }
- err = pc.SetLocalDescription(answer)
- if err != nil {
- pc.Destroy()
- return nil, err
- }
- return pc, nil
- }
- func main() {
- var err error
- var httpAddr string
- var logFilename string
- flag.StringVar(&httpAddr, "http", "", "listen for HTTP signaling")
- flag.StringVar(&logFilename, "log", "", "log file to write to")
- flag.Parse()
- log.SetFlags(log.LstdFlags | log.LUTC)
- if logFilename != "" {
- f, err := os.OpenFile(logFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
- if err != nil {
- log.Fatalf("can't open log file: %s", err)
- }
- defer logFile.Close()
- log.SetOutput(f)
- }
- log.Println("starting")
- webrtc.SetLoggingVerbosity(1)
- ptInfo, err = pt.ServerSetup(nil)
- if err != nil {
- log.Fatal(err)
- }
- webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
- // Start HTTP-based signaling receiver.
- go func() {
- err := receiveSignalsHTTP(httpAddr, webRTCConfig)
- if err != nil {
- log.Printf("receiveSignalsHTTP: %s", err)
- }
- }()
- for _, bindaddr := range ptInfo.Bindaddrs {
- switch bindaddr.MethodName {
- case ptMethodName:
- bindaddr.Addr.Port = 12345 // lies!!!
- pt.Smethod(bindaddr.MethodName, bindaddr.Addr)
- default:
- pt.SmethodError(bindaddr.MethodName, "no such method")
- }
- }
- pt.SmethodsDone()
- var numHandlers int = 0
- var sig os.Signal
- sigChan := make(chan os.Signal, 1)
- signal.Notify(sigChan, syscall.SIGTERM)
- if os.Getenv("TOR_PT_EXIT_ON_STDIN_CLOSE") == "1" {
- // This environment variable means we should treat EOF on stdin
- // just like SIGTERM: https://bugs.torproject.org/15435.
- go func() {
- io.Copy(ioutil.Discard, os.Stdin)
- log.Printf("synthesizing SIGTERM because of stdin close")
- sigChan <- syscall.SIGTERM
- }()
- }
- // keep track of handlers and wait for a signal
- sig = nil
- for sig == nil {
- select {
- case n := <-handlerChan:
- numHandlers += n
- case sig = <-sigChan:
- }
- }
- for numHandlers > 0 {
- numHandlers += <-handlerChan
- }
- }
|