quic_connection.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package connection
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "net/netip"
  10. "strconv"
  11. "strings"
  12. "sync/atomic"
  13. "time"
  14. "github.com/pkg/errors"
  15. "github.com/quic-go/quic-go"
  16. "github.com/rs/zerolog"
  17. "golang.org/x/sync/errgroup"
  18. "github.com/cloudflare/cloudflared/packet"
  19. cfdquic "github.com/cloudflare/cloudflared/quic"
  20. "github.com/cloudflare/cloudflared/tracing"
  21. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  22. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  23. rpcquic "github.com/cloudflare/cloudflared/tunnelrpc/quic"
  24. )
  25. const (
  26. // HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
  27. HTTPHeaderKey = "HttpHeader"
  28. // HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
  29. HTTPMethodKey = "HttpMethod"
  30. // HTTPHostKey is used to get or set http host in QUIC ALPN if the underlying proxy connection type is HTTP.
  31. HTTPHostKey = "HttpHost"
  32. QUICMetadataFlowID = "FlowID"
  33. )
  34. // quicConnection represents the type that facilitates Proxying via QUIC streams.
  35. type quicConnection struct {
  36. conn quic.Connection
  37. logger *zerolog.Logger
  38. orchestrator Orchestrator
  39. datagramHandler DatagramSessionHandler
  40. controlStreamHandler ControlStreamHandler
  41. connOptions *tunnelpogs.ConnectionOptions
  42. connIndex uint8
  43. rpcTimeout time.Duration
  44. streamWriteTimeout time.Duration
  45. gracePeriod time.Duration
  46. }
  47. // NewTunnelConnection takes a [quic.Connection] to wrap it for use with cloudflared application logic.
  48. func NewTunnelConnection(
  49. ctx context.Context,
  50. conn quic.Connection,
  51. connIndex uint8,
  52. orchestrator Orchestrator,
  53. datagramSessionHandler DatagramSessionHandler,
  54. controlStreamHandler ControlStreamHandler,
  55. connOptions *pogs.ConnectionOptions,
  56. rpcTimeout time.Duration,
  57. streamWriteTimeout time.Duration,
  58. gracePeriod time.Duration,
  59. logger *zerolog.Logger,
  60. ) (TunnelConnection, error) {
  61. return &quicConnection{
  62. conn: conn,
  63. logger: logger,
  64. orchestrator: orchestrator,
  65. datagramHandler: datagramSessionHandler,
  66. controlStreamHandler: controlStreamHandler,
  67. connOptions: connOptions,
  68. connIndex: connIndex,
  69. rpcTimeout: rpcTimeout,
  70. streamWriteTimeout: streamWriteTimeout,
  71. gracePeriod: gracePeriod,
  72. }, nil
  73. }
  74. // Serve starts a QUIC connection that begins accepting streams.
  75. func (q *quicConnection) Serve(ctx context.Context) error {
  76. // The edge assumes the first stream is used for the control plane
  77. controlStream, err := q.conn.OpenStream()
  78. if err != nil {
  79. return fmt.Errorf("failed to open a registration control stream: %w", err)
  80. }
  81. // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
  82. // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
  83. // connection).
  84. // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
  85. // other goroutine as fast as possible.
  86. ctx, cancel := context.WithCancel(ctx)
  87. errGroup, ctx := errgroup.WithContext(ctx)
  88. // In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
  89. // stream is already fully registered before the other goroutines can proceed.
  90. errGroup.Go(func() error {
  91. // err is equal to nil if we exit due to unregistration. If that happens we want to wait the full
  92. // amount of the grace period, allowing requests to finish before we cancel the context, which will
  93. // make cloudflared exit.
  94. if err := q.serveControlStream(ctx, controlStream); err == nil {
  95. select {
  96. case <-ctx.Done():
  97. case <-time.Tick(q.gracePeriod):
  98. }
  99. }
  100. cancel()
  101. return err
  102. })
  103. errGroup.Go(func() error {
  104. defer cancel()
  105. return q.acceptStream(ctx)
  106. })
  107. errGroup.Go(func() error {
  108. defer cancel()
  109. return q.datagramHandler.Serve(ctx)
  110. })
  111. return errGroup.Wait()
  112. }
  113. // serveControlStream will serve the RPC; blocking until the control plane is done.
  114. func (q *quicConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
  115. return q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
  116. }
  117. // Close the connection with no errors specified.
  118. func (q *quicConnection) Close() {
  119. q.conn.CloseWithError(0, "")
  120. }
  121. func (q *quicConnection) acceptStream(ctx context.Context) error {
  122. defer q.Close()
  123. for {
  124. quicStream, err := q.conn.AcceptStream(ctx)
  125. if err != nil {
  126. // context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
  127. if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
  128. return nil
  129. }
  130. return fmt.Errorf("failed to accept QUIC stream: %w", err)
  131. }
  132. go q.runStream(quicStream)
  133. }
  134. }
  135. func (q *quicConnection) runStream(quicStream quic.Stream) {
  136. ctx := quicStream.Context()
  137. stream := cfdquic.NewSafeStreamCloser(quicStream, q.streamWriteTimeout, q.logger)
  138. defer stream.Close()
  139. // we are going to fuse readers/writers from stream <- cloudflared -> origin, and we want to guarantee that
  140. // code executed in the code path of handleStream don't trigger an earlier close to the downstream write stream.
  141. // So, we wrap the stream with a no-op write closer and only this method can actually close write side of the stream.
  142. // A call to close will simulate a close to the read-side, which will fail subsequent reads.
  143. noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream}
  144. ss := rpcquic.NewCloudflaredServer(q.handleDataStream, q.datagramHandler, q, q.rpcTimeout)
  145. if err := ss.Serve(ctx, noCloseStream); err != nil {
  146. q.logger.Debug().Err(err).Msg("Failed to handle QUIC stream")
  147. // if we received an error at this level, then close write side of stream with an error, which will result in
  148. // RST_STREAM frame.
  149. quicStream.CancelWrite(0)
  150. }
  151. }
  152. func (q *quicConnection) handleDataStream(ctx context.Context, stream *rpcquic.RequestServerStream) error {
  153. request, err := stream.ReadConnectRequestData()
  154. if err != nil {
  155. return err
  156. }
  157. if err, connectResponseSent := q.dispatchRequest(ctx, stream, request); err != nil {
  158. q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed")
  159. // if the connectResponse was already sent and we had an error, we need to propagate it up, so that the stream is
  160. // closed with an RST_STREAM frame
  161. if connectResponseSent {
  162. return err
  163. }
  164. if writeRespErr := stream.WriteConnectResponseData(err); writeRespErr != nil {
  165. return writeRespErr
  166. }
  167. }
  168. return nil
  169. }
  170. // dispatchRequest will dispatch the request to the origin depending on the type and returns an error if it occurs.
  171. // Also returns if the connect response was sent to the downstream during processing of the origin request.
  172. func (q *quicConnection) dispatchRequest(ctx context.Context, stream *rpcquic.RequestServerStream, request *pogs.ConnectRequest) (err error, connectResponseSent bool) {
  173. originProxy, err := q.orchestrator.GetOriginProxy()
  174. if err != nil {
  175. return err, false
  176. }
  177. switch request.Type {
  178. case pogs.ConnectionTypeHTTP, pogs.ConnectionTypeWebsocket:
  179. tracedReq, err := buildHTTPRequest(ctx, request, stream, q.connIndex, q.logger)
  180. if err != nil {
  181. return err, false
  182. }
  183. w := newHTTPResponseAdapter(stream)
  184. return originProxy.ProxyHTTP(&w, tracedReq, request.Type == pogs.ConnectionTypeWebsocket), w.connectResponseSent
  185. case pogs.ConnectionTypeTCP:
  186. rwa := &streamReadWriteAcker{RequestServerStream: stream}
  187. metadata := request.MetadataMap()
  188. return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
  189. Dest: request.Dest,
  190. FlowID: metadata[QUICMetadataFlowID],
  191. CfTraceID: metadata[tracing.TracerContextName],
  192. ConnIndex: q.connIndex,
  193. }), rwa.connectResponseSent
  194. default:
  195. return errors.Errorf("unsupported error type: %s", request.Type), false
  196. }
  197. }
  198. // UpdateConfiguration is the RPC method invoked by edge when there is a new configuration
  199. func (q *quicConnection) UpdateConfiguration(ctx context.Context, version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
  200. return q.orchestrator.UpdateConfig(version, config)
  201. }
  202. // streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
  203. // the client.
  204. type streamReadWriteAcker struct {
  205. *rpcquic.RequestServerStream
  206. connectResponseSent bool
  207. }
  208. // AckConnection acks response back to the proxy.
  209. func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
  210. metadata := []pogs.Metadata{}
  211. // Only add tracing if provided by the edge request
  212. if tracePropagation != "" {
  213. metadata = append(metadata, pogs.Metadata{
  214. Key: tracing.CanonicalCloudflaredTracingHeader,
  215. Val: tracePropagation,
  216. })
  217. }
  218. s.connectResponseSent = true
  219. return s.WriteConnectResponseData(nil, metadata...)
  220. }
  221. // httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
  222. type httpResponseAdapter struct {
  223. *rpcquic.RequestServerStream
  224. headers http.Header
  225. connectResponseSent bool
  226. }
  227. func newHTTPResponseAdapter(s *rpcquic.RequestServerStream) httpResponseAdapter {
  228. return httpResponseAdapter{RequestServerStream: s, headers: make(http.Header)}
  229. }
  230. func (hrw *httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
  231. // we do not support trailers over QUIC
  232. }
  233. func (hrw *httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
  234. metadata := make([]pogs.Metadata, 0)
  235. metadata = append(metadata, pogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
  236. for k, vv := range header {
  237. for _, v := range vv {
  238. httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
  239. metadata = append(metadata, pogs.Metadata{Key: httpHeaderKey, Val: v})
  240. }
  241. }
  242. return hrw.WriteConnectResponseData(nil, metadata...)
  243. }
  244. func (hrw *httpResponseAdapter) Write(p []byte) (int, error) {
  245. // Make sure to send WriteHeader response if not called yet
  246. if !hrw.connectResponseSent {
  247. hrw.WriteRespHeaders(http.StatusOK, hrw.headers)
  248. }
  249. return hrw.RequestServerStream.Write(p)
  250. }
  251. func (hrw *httpResponseAdapter) Header() http.Header {
  252. return hrw.headers
  253. }
  254. // This is a no-op Flush because this adapter is over a quic.Stream and we don't need Flush here.
  255. func (hrw *httpResponseAdapter) Flush() {}
  256. func (hrw *httpResponseAdapter) WriteHeader(status int) {
  257. hrw.WriteRespHeaders(status, hrw.headers)
  258. }
  259. func (hrw *httpResponseAdapter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
  260. conn := &localProxyConnection{hrw.ReadWriteCloser}
  261. readWriter := bufio.NewReadWriter(
  262. bufio.NewReader(hrw.ReadWriteCloser),
  263. bufio.NewWriter(hrw.ReadWriteCloser),
  264. )
  265. return conn, readWriter, nil
  266. }
  267. func (hrw *httpResponseAdapter) WriteErrorResponse(err error) {
  268. hrw.WriteConnectResponseData(err, pogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
  269. }
  270. func (hrw *httpResponseAdapter) WriteConnectResponseData(respErr error, metadata ...pogs.Metadata) error {
  271. hrw.connectResponseSent = true
  272. return hrw.RequestServerStream.WriteConnectResponseData(respErr, metadata...)
  273. }
  274. func buildHTTPRequest(
  275. ctx context.Context,
  276. connectRequest *pogs.ConnectRequest,
  277. body io.ReadCloser,
  278. connIndex uint8,
  279. log *zerolog.Logger,
  280. ) (*tracing.TracedHTTPRequest, error) {
  281. metadata := connectRequest.MetadataMap()
  282. dest := connectRequest.Dest
  283. method := metadata[HTTPMethodKey]
  284. host := metadata[HTTPHostKey]
  285. isWebsocket := connectRequest.Type == pogs.ConnectionTypeWebsocket
  286. req, err := http.NewRequestWithContext(ctx, method, dest, body)
  287. if err != nil {
  288. return nil, err
  289. }
  290. req.Host = host
  291. for _, metadata := range connectRequest.Metadata {
  292. if strings.Contains(metadata.Key, HTTPHeaderKey) {
  293. // metadata.Key is off the format httpHeaderKey:<HTTPHeader>
  294. httpHeaderKey := strings.Split(metadata.Key, ":")
  295. if len(httpHeaderKey) != 2 {
  296. return nil, fmt.Errorf("header Key: %s malformed", metadata.Key)
  297. }
  298. req.Header.Add(httpHeaderKey[1], metadata.Val)
  299. }
  300. }
  301. // Go's http.Client automatically sends chunked request body if this value is not set on the
  302. // *http.Request struct regardless of header:
  303. // https://go.googlesource.com/go/+/go1.8rc2/src/net/http/transfer.go#154.
  304. if err := setContentLength(req); err != nil {
  305. return nil, fmt.Errorf("Error setting content-length: %w", err)
  306. }
  307. // Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
  308. // * the request body blocks
  309. // * the content length is not set (or set to -1)
  310. // * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
  311. // * there is no transfer-encoding=chunked already set.
  312. // So, if transfer cannot be chunked and content length is 0, we dont set a request body.
  313. if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
  314. req.Body = http.NoBody
  315. }
  316. stripWebsocketUpgradeHeader(req)
  317. // Check for tracing on request
  318. tracedReq := tracing.NewTracedHTTPRequest(req, connIndex, log)
  319. return tracedReq, err
  320. }
  321. func setContentLength(req *http.Request) error {
  322. var err error
  323. if contentLengthStr := req.Header.Get("Content-Length"); contentLengthStr != "" {
  324. req.ContentLength, err = strconv.ParseInt(contentLengthStr, 10, 64)
  325. }
  326. return err
  327. }
  328. func isTransferEncodingChunked(req *http.Request) bool {
  329. transferEncodingVal := req.Header.Get("Transfer-Encoding")
  330. // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding suggests that this can be a comma
  331. // separated value as well.
  332. return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
  333. }
  334. // A helper struct that guarantees a call to close only affects read side, but not write side.
  335. type nopCloserReadWriter struct {
  336. io.ReadWriteCloser
  337. // for use by Read only
  338. // we don't need a memory barrier here because there is an implicit assumption that
  339. // Read calls can't happen concurrently by different go-routines.
  340. sawEOF bool
  341. // should be updated and read using atomic primitives.
  342. // value is read in Read method and written in Close method, which could be done by different
  343. // go-routines.
  344. closed uint32
  345. }
  346. func (np *nopCloserReadWriter) Read(p []byte) (n int, err error) {
  347. if np.sawEOF {
  348. return 0, io.EOF
  349. }
  350. if atomic.LoadUint32(&np.closed) > 0 {
  351. return 0, fmt.Errorf("closed by handler")
  352. }
  353. n, err = np.ReadWriteCloser.Read(p)
  354. if err == io.EOF {
  355. np.sawEOF = true
  356. }
  357. return
  358. }
  359. func (np *nopCloserReadWriter) Close() error {
  360. atomic.StoreUint32(&np.closed, 1)
  361. return nil
  362. }
  363. // muxerWrapper wraps DatagramMuxerV2 to satisfy the packet.FunnelUniPipe interface
  364. type muxerWrapper struct {
  365. muxer *cfdquic.DatagramMuxerV2
  366. }
  367. func (rp *muxerWrapper) SendPacket(dst netip.Addr, pk packet.RawPacket) error {
  368. return rp.muxer.SendPacket(cfdquic.RawPacket(pk))
  369. }
  370. func (rp *muxerWrapper) ReceivePacket(ctx context.Context) (packet.RawPacket, error) {
  371. pk, err := rp.muxer.ReceivePacket(ctx)
  372. if err != nil {
  373. return packet.RawPacket{}, err
  374. }
  375. rawPacket, ok := pk.(cfdquic.RawPacket)
  376. if ok {
  377. return packet.RawPacket(rawPacket), nil
  378. }
  379. return packet.RawPacket{}, fmt.Errorf("unexpected packet type %+v", pk)
  380. }
  381. func (rp *muxerWrapper) Close() error {
  382. return nil
  383. }