123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- package connection
- import (
- "bufio"
- "context"
- "fmt"
- "io"
- "net"
- "net/http"
- "strconv"
- "strings"
- "sync/atomic"
- "time"
- "github.com/pkg/errors"
- "github.com/quic-go/quic-go"
- "github.com/rs/zerolog"
- "golang.org/x/sync/errgroup"
- cfdflow "github.com/cloudflare/cloudflared/flow"
- cfdquic "github.com/cloudflare/cloudflared/quic"
- "github.com/cloudflare/cloudflared/tracing"
- "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
- tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
- rpcquic "github.com/cloudflare/cloudflared/tunnelrpc/quic"
- )
- const (
- // HTTPHeaderKey is used to get or set http headers in QUIC ALPN if the underlying proxy connection type is HTTP.
- HTTPHeaderKey = "HttpHeader"
- // HTTPMethodKey is used to get or set http method in QUIC ALPN if the underlying proxy connection type is HTTP.
- HTTPMethodKey = "HttpMethod"
- // HTTPHostKey is used to get or set http host in QUIC ALPN if the underlying proxy connection type is HTTP.
- HTTPHostKey = "HttpHost"
- QUICMetadataFlowID = "FlowID"
- )
- // quicConnection represents the type that facilitates Proxying via QUIC streams.
- type quicConnection struct {
- conn quic.Connection
- logger *zerolog.Logger
- orchestrator Orchestrator
- datagramHandler DatagramSessionHandler
- controlStreamHandler ControlStreamHandler
- connOptions *tunnelpogs.ConnectionOptions
- connIndex uint8
- rpcTimeout time.Duration
- streamWriteTimeout time.Duration
- gracePeriod time.Duration
- }
- // NewTunnelConnection takes a [quic.Connection] to wrap it for use with cloudflared application logic.
- func NewTunnelConnection(
- ctx context.Context,
- conn quic.Connection,
- connIndex uint8,
- orchestrator Orchestrator,
- datagramSessionHandler DatagramSessionHandler,
- controlStreamHandler ControlStreamHandler,
- connOptions *pogs.ConnectionOptions,
- rpcTimeout time.Duration,
- streamWriteTimeout time.Duration,
- gracePeriod time.Duration,
- logger *zerolog.Logger,
- ) (TunnelConnection, error) {
- return &quicConnection{
- conn: conn,
- logger: logger,
- orchestrator: orchestrator,
- datagramHandler: datagramSessionHandler,
- controlStreamHandler: controlStreamHandler,
- connOptions: connOptions,
- connIndex: connIndex,
- rpcTimeout: rpcTimeout,
- streamWriteTimeout: streamWriteTimeout,
- gracePeriod: gracePeriod,
- }, nil
- }
- // Serve starts a QUIC connection that begins accepting streams.
- func (q *quicConnection) Serve(ctx context.Context) error {
- // The edge assumes the first stream is used for the control plane
- controlStream, err := q.conn.OpenStream()
- if err != nil {
- return fmt.Errorf("failed to open a registration control stream: %w", err)
- }
- // If either goroutine returns nil error, we rely on this cancellation to make sure the other goroutine exits
- // as fast as possible as well. Nil error means we want to exit for good (caller code won't retry serving this
- // connection).
- // If either goroutine returns a non nil error, then the error group cancels the context, thus also canceling the
- // other goroutine as fast as possible.
- ctx, cancel := context.WithCancel(ctx)
- errGroup, ctx := errgroup.WithContext(ctx)
- // In the future, if cloudflared can autonomously push traffic to the edge, we have to make sure the control
- // stream is already fully registered before the other goroutines can proceed.
- errGroup.Go(func() error {
- // err is equal to nil if we exit due to unregistration. If that happens we want to wait the full
- // amount of the grace period, allowing requests to finish before we cancel the context, which will
- // make cloudflared exit.
- if err := q.serveControlStream(ctx, controlStream); err == nil {
- if q.gracePeriod > 0 {
- // In Go1.23 this can be removed and replaced with time.Ticker
- // see https://pkg.go.dev/time#Tick
- ticker := time.NewTicker(q.gracePeriod)
- defer ticker.Stop()
- select {
- case <-ctx.Done():
- case <-ticker.C:
- }
- }
- }
- cancel()
- return err
- })
- errGroup.Go(func() error {
- defer cancel()
- return q.acceptStream(ctx)
- })
- errGroup.Go(func() error {
- defer cancel()
- return q.datagramHandler.Serve(ctx)
- })
- return errGroup.Wait()
- }
- // serveControlStream will serve the RPC; blocking until the control plane is done.
- func (q *quicConnection) serveControlStream(ctx context.Context, controlStream quic.Stream) error {
- return q.controlStreamHandler.ServeControlStream(ctx, controlStream, q.connOptions, q.orchestrator)
- }
- // Close the connection with no errors specified.
- func (q *quicConnection) Close() {
- _ = q.conn.CloseWithError(0, "")
- }
- func (q *quicConnection) acceptStream(ctx context.Context) error {
- defer q.Close()
- for {
- quicStream, err := q.conn.AcceptStream(ctx)
- if err != nil {
- // context.Canceled is usually a user ctrl+c. We don't want to log an error here as it's intentional.
- if errors.Is(err, context.Canceled) || q.controlStreamHandler.IsStopped() {
- return nil
- }
- return fmt.Errorf("failed to accept QUIC stream: %w", err)
- }
- go q.runStream(quicStream)
- }
- }
- func (q *quicConnection) runStream(quicStream quic.Stream) {
- ctx := quicStream.Context()
- stream := cfdquic.NewSafeStreamCloser(quicStream, q.streamWriteTimeout, q.logger)
- defer stream.Close()
- // we are going to fuse readers/writers from stream <- cloudflared -> origin, and we want to guarantee that
- // code executed in the code path of handleStream don't trigger an earlier close to the downstream write stream.
- // So, we wrap the stream with a no-op write closer and only this method can actually close write side of the stream.
- // A call to close will simulate a close to the read-side, which will fail subsequent reads.
- noCloseStream := &nopCloserReadWriter{ReadWriteCloser: stream}
- ss := rpcquic.NewCloudflaredServer(q.handleDataStream, q.datagramHandler, q, q.rpcTimeout)
- if err := ss.Serve(ctx, noCloseStream); err != nil {
- q.logger.Debug().Err(err).Msg("Failed to handle QUIC stream")
- // if we received an error at this level, then close write side of stream with an error, which will result in
- // RST_STREAM frame.
- quicStream.CancelWrite(0)
- }
- }
- func (q *quicConnection) handleDataStream(ctx context.Context, stream *rpcquic.RequestServerStream) error {
- request, err := stream.ReadConnectRequestData()
- if err != nil {
- return err
- }
- if err, connectResponseSent := q.dispatchRequest(ctx, stream, request); err != nil {
- q.logger.Err(err).Str("type", request.Type.String()).Str("dest", request.Dest).Msg("Request failed")
- // if the connectResponse was already sent and we had an error, we need to propagate it up, so that the stream is
- // closed with an RST_STREAM frame
- if connectResponseSent {
- return err
- }
- var metadata []pogs.Metadata
- // Check the type of error that was throw and add metadata that will help identify it on OTD.
- if errors.Is(err, cfdflow.ErrTooManyActiveFlows) {
- metadata = append(metadata, pogs.ErrorFlowConnectRateLimitedMetadata)
- }
- if writeRespErr := stream.WriteConnectResponseData(err, metadata...); writeRespErr != nil {
- return writeRespErr
- }
- }
- return nil
- }
- // dispatchRequest will dispatch the request to the origin depending on the type and returns an error if it occurs.
- // Also returns if the connect response was sent to the downstream during processing of the origin request.
- func (q *quicConnection) dispatchRequest(ctx context.Context, stream *rpcquic.RequestServerStream, request *pogs.ConnectRequest) (err error, connectResponseSent bool) {
- originProxy, err := q.orchestrator.GetOriginProxy()
- if err != nil {
- return err, false
- }
- switch request.Type {
- case pogs.ConnectionTypeHTTP, pogs.ConnectionTypeWebsocket:
- tracedReq, err := buildHTTPRequest(ctx, request, stream, q.connIndex, q.logger)
- if err != nil {
- return err, false
- }
- w := newHTTPResponseAdapter(stream)
- return originProxy.ProxyHTTP(&w, tracedReq, request.Type == pogs.ConnectionTypeWebsocket), w.connectResponseSent
- case pogs.ConnectionTypeTCP:
- rwa := &streamReadWriteAcker{RequestServerStream: stream}
- metadata := request.MetadataMap()
- return originProxy.ProxyTCP(ctx, rwa, &TCPRequest{
- Dest: request.Dest,
- FlowID: metadata[QUICMetadataFlowID],
- CfTraceID: metadata[tracing.TracerContextName],
- ConnIndex: q.connIndex,
- }), rwa.connectResponseSent
- default:
- return errors.Errorf("unsupported error type: %s", request.Type), false
- }
- }
- // UpdateConfiguration is the RPC method invoked by edge when there is a new configuration
- func (q *quicConnection) UpdateConfiguration(ctx context.Context, version int32, config []byte) *tunnelpogs.UpdateConfigurationResponse {
- return q.orchestrator.UpdateConfig(version, config)
- }
- // streamReadWriteAcker is a light wrapper over QUIC streams with a callback to send response back to
- // the client.
- type streamReadWriteAcker struct {
- *rpcquic.RequestServerStream
- connectResponseSent bool
- }
- // AckConnection acks response back to the proxy.
- func (s *streamReadWriteAcker) AckConnection(tracePropagation string) error {
- metadata := []pogs.Metadata{}
- // Only add tracing if provided by the edge request
- if tracePropagation != "" {
- metadata = append(metadata, pogs.Metadata{
- Key: tracing.CanonicalCloudflaredTracingHeader,
- Val: tracePropagation,
- })
- }
- s.connectResponseSent = true
- return s.WriteConnectResponseData(nil, metadata...)
- }
- // httpResponseAdapter translates responses written by the HTTP Proxy into ones that can be used in QUIC.
- type httpResponseAdapter struct {
- *rpcquic.RequestServerStream
- headers http.Header
- connectResponseSent bool
- }
- func newHTTPResponseAdapter(s *rpcquic.RequestServerStream) httpResponseAdapter {
- return httpResponseAdapter{RequestServerStream: s, headers: make(http.Header)}
- }
- func (hrw *httpResponseAdapter) AddTrailer(trailerName, trailerValue string) {
- // we do not support trailers over QUIC
- }
- func (hrw *httpResponseAdapter) WriteRespHeaders(status int, header http.Header) error {
- metadata := make([]pogs.Metadata, 0)
- metadata = append(metadata, pogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(status)})
- for k, vv := range header {
- for _, v := range vv {
- httpHeaderKey := fmt.Sprintf("%s:%s", HTTPHeaderKey, k)
- metadata = append(metadata, pogs.Metadata{Key: httpHeaderKey, Val: v})
- }
- }
- return hrw.WriteConnectResponseData(nil, metadata...)
- }
- func (hrw *httpResponseAdapter) Write(p []byte) (int, error) {
- // Make sure to send WriteHeader response if not called yet
- if !hrw.connectResponseSent {
- _ = hrw.WriteRespHeaders(http.StatusOK, hrw.headers)
- }
- return hrw.RequestServerStream.Write(p)
- }
- func (hrw *httpResponseAdapter) Header() http.Header {
- return hrw.headers
- }
- // This is a no-op Flush because this adapter is over a quic.Stream and we don't need Flush here.
- func (hrw *httpResponseAdapter) Flush() {}
- func (hrw *httpResponseAdapter) WriteHeader(status int) {
- _ = hrw.WriteRespHeaders(status, hrw.headers)
- }
- func (hrw *httpResponseAdapter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
- conn := &localProxyConnection{hrw.ReadWriteCloser}
- readWriter := bufio.NewReadWriter(
- bufio.NewReader(hrw.ReadWriteCloser),
- bufio.NewWriter(hrw.ReadWriteCloser),
- )
- return conn, readWriter, nil
- }
- func (hrw *httpResponseAdapter) WriteErrorResponse(err error) {
- _ = hrw.WriteConnectResponseData(err, pogs.Metadata{Key: "HttpStatus", Val: strconv.Itoa(http.StatusBadGateway)})
- }
- func (hrw *httpResponseAdapter) WriteConnectResponseData(respErr error, metadata ...pogs.Metadata) error {
- hrw.connectResponseSent = true
- return hrw.RequestServerStream.WriteConnectResponseData(respErr, metadata...)
- }
- func buildHTTPRequest(
- ctx context.Context,
- connectRequest *pogs.ConnectRequest,
- body io.ReadCloser,
- connIndex uint8,
- log *zerolog.Logger,
- ) (*tracing.TracedHTTPRequest, error) {
- metadata := connectRequest.MetadataMap()
- dest := connectRequest.Dest
- method := metadata[HTTPMethodKey]
- host := metadata[HTTPHostKey]
- isWebsocket := connectRequest.Type == pogs.ConnectionTypeWebsocket
- req, err := http.NewRequestWithContext(ctx, method, dest, body)
- if err != nil {
- return nil, err
- }
- req.Host = host
- for _, metadata := range connectRequest.Metadata {
- if strings.Contains(metadata.Key, HTTPHeaderKey) {
- // metadata.Key is off the format httpHeaderKey:<HTTPHeader>
- httpHeaderKey := strings.Split(metadata.Key, ":")
- if len(httpHeaderKey) != 2 {
- return nil, fmt.Errorf("header Key: %s malformed", metadata.Key)
- }
- req.Header.Add(httpHeaderKey[1], metadata.Val)
- }
- }
- // Go's http.Client automatically sends chunked request body if this value is not set on the
- // *http.Request struct regardless of header:
- // https://go.googlesource.com/go/+/go1.8rc2/src/net/http/transfer.go#154.
- if err := setContentLength(req); err != nil {
- return nil, fmt.Errorf("Error setting content-length: %w", err)
- }
- // Go's client defaults to chunked encoding after a 200ms delay if the following cases are true:
- // * the request body blocks
- // * the content length is not set (or set to -1)
- // * the method doesn't usually have a body (GET, HEAD, DELETE, ...)
- // * there is no transfer-encoding=chunked already set.
- // So, if transfer cannot be chunked and content length is 0, we dont set a request body.
- if !isWebsocket && !isTransferEncodingChunked(req) && req.ContentLength == 0 {
- req.Body = http.NoBody
- }
- stripWebsocketUpgradeHeader(req)
- // Check for tracing on request
- tracedReq := tracing.NewTracedHTTPRequest(req, connIndex, log)
- return tracedReq, err
- }
- func setContentLength(req *http.Request) error {
- var err error
- if contentLengthStr := req.Header.Get("Content-Length"); contentLengthStr != "" {
- req.ContentLength, err = strconv.ParseInt(contentLengthStr, 10, 64)
- }
- return err
- }
- func isTransferEncodingChunked(req *http.Request) bool {
- transferEncodingVal := req.Header.Get("Transfer-Encoding")
- // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding suggests that this can be a comma
- // separated value as well.
- return strings.Contains(strings.ToLower(transferEncodingVal), "chunked")
- }
- // A helper struct that guarantees a call to close only affects read side, but not write side.
- type nopCloserReadWriter struct {
- io.ReadWriteCloser
- // for use by Read only
- // we don't need a memory barrier here because there is an implicit assumption that
- // Read calls can't happen concurrently by different go-routines.
- sawEOF bool
- // should be updated and read using atomic primitives.
- // value is read in Read method and written in Close method, which could be done by different
- // go-routines.
- closed uint32
- }
- func (np *nopCloserReadWriter) Read(p []byte) (n int, err error) {
- if np.sawEOF {
- return 0, io.EOF
- }
- if atomic.LoadUint32(&np.closed) > 0 {
- return 0, fmt.Errorf("closed by handler")
- }
- n, err = np.ReadWriteCloser.Read(p)
- if err == io.EOF {
- np.sawEOF = true
- }
- return
- }
- func (np *nopCloserReadWriter) Close() error {
- atomic.StoreUint32(&np.closed, 1)
- return nil
- }