meek.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. /*
  2. * Copyright (c) 2015, Yawning Angel <yawning at schwanenlied dot me>
  3. * All rights reserved.
  4. *
  5. * Redistribution and use in source and binary forms, with or without
  6. * modification, are permitted provided that the following conditions are met:
  7. *
  8. * * Redistributions of source code must retain the above copyright notice,
  9. * this list of conditions and the following disclaimer.
  10. *
  11. * * Redistributions in binary form must reproduce the above copyright notice,
  12. * this list of conditions and the following disclaimer in the documentation
  13. * and/or other materials provided with the distribution.
  14. *
  15. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  16. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  17. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  18. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
  19. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  20. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  21. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  22. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  23. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  24. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  25. * POSSIBILITY OF SUCH DAMAGE.
  26. */
  27. package meeklite
  28. import (
  29. "bytes"
  30. "crypto/rand"
  31. "crypto/sha256"
  32. "encoding/hex"
  33. "errors"
  34. "fmt"
  35. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/lyrebird/common/utlsutil"
  36. "io"
  37. "io/ioutil"
  38. "net"
  39. "net/http"
  40. gourl "net/url"
  41. "os"
  42. "runtime"
  43. "sync"
  44. "time"
  45. utls "github.com/refraction-networking/utls"
  46. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/goptlib"
  47. "gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/lyrebird/transports/base"
  48. )
  49. const (
  50. urlArg = "url"
  51. frontArg = "front"
  52. utlsArg = "utls"
  53. maxChanBacklog = 16
  54. // Constants shamelessly stolen from meek-client.go...
  55. maxPayloadLength = 0x10000
  56. initPollInterval = 100 * time.Millisecond
  57. maxPollInterval = 5 * time.Second
  58. pollIntervalMultiplier = 1.5
  59. maxRetries = 10
  60. retryDelay = 30 * time.Second
  61. )
  62. var (
  63. // ErrNotSupported is the error returned for a unsupported operation.
  64. ErrNotSupported = errors.New("meek_lite: operation not supported")
  65. loopbackAddr = net.IPv4(127, 0, 0, 1)
  66. )
  67. type meekClientArgs struct {
  68. url *gourl.URL
  69. front string
  70. utls *utls.ClientHelloID
  71. }
  72. func (ca *meekClientArgs) Network() string {
  73. return transportName
  74. }
  75. func (ca *meekClientArgs) String() string {
  76. return transportName + ":" + ca.front + ":" + ca.url.String()
  77. }
  78. func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) {
  79. ca = &meekClientArgs{}
  80. // Parse the URL argument.
  81. str, ok := args.Get(urlArg)
  82. if !ok {
  83. return nil, fmt.Errorf("missing argument '%s'", urlArg)
  84. }
  85. ca.url, err = gourl.Parse(str)
  86. if err != nil {
  87. return nil, fmt.Errorf("malformed url: '%s'", str)
  88. }
  89. switch ca.url.Scheme {
  90. case "http", "https":
  91. default:
  92. return nil, fmt.Errorf("invalid scheme: '%s'", ca.url.Scheme)
  93. }
  94. // Parse the (optional) front argument.
  95. ca.front, _ = args.Get(frontArg)
  96. // Parse the (optional) utls argument.
  97. utlsOpt, _ := args.Get(utlsArg)
  98. if ca.utls, err = utlsutil.ParseClientHelloID(utlsOpt); err != nil {
  99. return nil, err
  100. }
  101. return ca, nil
  102. }
  103. type meekConn struct {
  104. args *meekClientArgs
  105. sessionID string
  106. roundTripper http.RoundTripper
  107. closeOnce sync.Once
  108. workerWrChan chan []byte
  109. workerRdChan chan []byte
  110. workerCloseChan chan struct{}
  111. rdBuf *bytes.Buffer
  112. }
  113. func (c *meekConn) Read(p []byte) (n int, err error) {
  114. // If there is data left over from the previous read,
  115. // service the request using the buffered data.
  116. if c.rdBuf != nil {
  117. if c.rdBuf.Len() == 0 {
  118. panic("empty read buffer")
  119. }
  120. n, err = c.rdBuf.Read(p)
  121. if c.rdBuf.Len() == 0 {
  122. c.rdBuf = nil
  123. }
  124. return
  125. }
  126. // Wait for the worker to enqueue more incoming data.
  127. b, ok := <-c.workerRdChan
  128. if !ok {
  129. // Close() was called and the worker's shutting down.
  130. return 0, io.ErrClosedPipe
  131. }
  132. // Ew, an extra copy, but who am I kidding, it's meek.
  133. buf := bytes.NewBuffer(b)
  134. n, err = buf.Read(p)
  135. if buf.Len() > 0 {
  136. // If there's data pending, stash the buffer so the next
  137. // Read() call will use it to fulfuill the Read().
  138. c.rdBuf = buf
  139. }
  140. return
  141. }
  142. func (c *meekConn) Write(b []byte) (n int, err error) {
  143. // Check to see if the connection is actually open.
  144. select {
  145. case <-c.workerCloseChan:
  146. return 0, io.ErrClosedPipe
  147. default:
  148. }
  149. if len(b) == 0 {
  150. return 0, nil
  151. }
  152. // Copy the data to be written to a new slice, since
  153. // we return immediately after queuing and the peer can
  154. // happily reuse `b` before data has been sent.
  155. b2 := append([]byte{}, b...)
  156. if ok := c.enqueueWrite(b2); !ok {
  157. // Technically we did enqueue data, but the worker's
  158. // got closed out from under us.
  159. return 0, io.ErrClosedPipe
  160. }
  161. runtime.Gosched()
  162. return len(b), nil
  163. }
  164. func (c *meekConn) Close() error {
  165. err := os.ErrClosed
  166. c.closeOnce.Do(func() {
  167. // Tear down the worker, if it is still running.
  168. close(c.workerCloseChan)
  169. err = nil
  170. })
  171. return err
  172. }
  173. func (c *meekConn) LocalAddr() net.Addr {
  174. return &net.IPAddr{IP: loopbackAddr}
  175. }
  176. func (c *meekConn) RemoteAddr() net.Addr {
  177. return c.args
  178. }
  179. func (c *meekConn) SetDeadline(t time.Time) error {
  180. return ErrNotSupported
  181. }
  182. func (c *meekConn) SetReadDeadline(t time.Time) error {
  183. return ErrNotSupported
  184. }
  185. func (c *meekConn) SetWriteDeadline(t time.Time) error {
  186. return ErrNotSupported
  187. }
  188. func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
  189. defer func() {
  190. if err := recover(); err != nil {
  191. ok = false
  192. }
  193. }()
  194. c.workerWrChan <- b
  195. return true
  196. }
  197. func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
  198. var req *http.Request
  199. var resp *http.Response
  200. for retries := 0; retries < maxRetries; retries++ {
  201. url := *c.args.url
  202. host := url.Host
  203. if c.args.front != "" {
  204. url.Host = c.args.front
  205. }
  206. var body io.Reader
  207. if len(sndBuf) > 0 {
  208. body = bytes.NewReader(sndBuf)
  209. }
  210. req, err = http.NewRequest("POST", url.String(), body)
  211. if err != nil {
  212. return nil, err
  213. }
  214. if c.args.front != "" {
  215. req.Host = host
  216. }
  217. req.Header.Set("X-Session-Id", c.sessionID)
  218. req.Header.Set("User-Agent", "")
  219. resp, err = c.roundTripper.RoundTrip(req)
  220. if err != nil {
  221. return nil, err
  222. }
  223. if resp.StatusCode == http.StatusOK {
  224. recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength))
  225. resp.Body.Close()
  226. return
  227. }
  228. resp.Body.Close()
  229. err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
  230. time.Sleep(retryDelay)
  231. }
  232. return
  233. }
  234. func (c *meekConn) ioWorker() {
  235. interval := initPollInterval
  236. var sndBuf, leftBuf []byte
  237. loop:
  238. for {
  239. sndBuf = nil
  240. select {
  241. case <-time.After(interval):
  242. // If the poll interval has elapsed, issue a request.
  243. case sndBuf = <-c.workerWrChan:
  244. // If there is data pending a send, issue a request.
  245. case <-c.workerCloseChan:
  246. break loop
  247. }
  248. // Combine short writes as long as data is available to be
  249. // sent immediately and it will not put us over the max
  250. // payload limit. Any excess data is stored and dispatched
  251. // as the next request).
  252. sndBuf = append(leftBuf, sndBuf...)
  253. wrSz := len(sndBuf)
  254. for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
  255. b := <-c.workerWrChan
  256. sndBuf = append(sndBuf, b...)
  257. wrSz = len(sndBuf)
  258. }
  259. if wrSz > maxPayloadLength {
  260. wrSz = maxPayloadLength
  261. }
  262. // Issue a request.
  263. rdBuf, err := c.roundTrip(sndBuf[:wrSz])
  264. if err != nil {
  265. // Welp, something went horrifically wrong.
  266. break loop
  267. }
  268. // Stash the remaining payload if any.
  269. leftBuf = sndBuf[wrSz:] // Store the remaining data
  270. if len(leftBuf) == 0 {
  271. leftBuf = nil
  272. }
  273. // Determine the next poll interval.
  274. if len(rdBuf) > 0 {
  275. // Received data, enqueue the read.
  276. c.workerRdChan <- rdBuf
  277. // And poll immediately.
  278. interval = 0
  279. } else if wrSz > 0 {
  280. // Sent data, poll immediately.
  281. interval = 0
  282. } else if interval == 0 {
  283. // Neither sent nor received data after a poll, re-initialize the delay.
  284. interval = initPollInterval
  285. } else {
  286. // Apply a multiplicative backoff.
  287. interval = time.Duration(float64(interval) * pollIntervalMultiplier)
  288. if interval > maxPollInterval {
  289. interval = maxPollInterval
  290. }
  291. }
  292. runtime.Gosched()
  293. }
  294. // Unblock callers waiting in Read() for data that will never arrive,
  295. // and callers waiting in Write() for data that will never get sent.
  296. close(c.workerRdChan)
  297. close(c.workerWrChan)
  298. // Close the connection (extra calls to Close() are harmless).
  299. _ = c.Close()
  300. }
  301. func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) {
  302. id, err := newSessionID()
  303. if err != nil {
  304. return nil, err
  305. }
  306. var rt http.RoundTripper
  307. switch ca.utls {
  308. case nil:
  309. rt = &http.Transport{Dial: dialFn}
  310. default:
  311. rt = newRoundTripper(dialFn, ca.utls)
  312. }
  313. conn := &meekConn{
  314. args: ca,
  315. sessionID: id,
  316. roundTripper: rt,
  317. workerWrChan: make(chan []byte, maxChanBacklog),
  318. workerRdChan: make(chan []byte, maxChanBacklog),
  319. workerCloseChan: make(chan struct{}),
  320. }
  321. // Start the I/O worker.
  322. go conn.ioWorker()
  323. return conn, nil
  324. }
  325. func newSessionID() (string, error) {
  326. var b [64]byte
  327. if _, err := rand.Read(b[:]); err != nil {
  328. return "", err
  329. }
  330. h := sha256.Sum256(b[:])
  331. return hex.EncodeToString(h[:16]), nil
  332. }
  333. var (
  334. _ net.Conn = (*meekConn)(nil)
  335. _ net.Addr = (*meekClientArgs)(nil)
  336. )