123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- package connection
- import (
- "context"
- "encoding/base64"
- "fmt"
- "io"
- "math"
- "net"
- "net/http"
- "strconv"
- "strings"
- "time"
- "github.com/google/uuid"
- "github.com/pkg/errors"
- "github.com/cloudflare/cloudflared/tracing"
- "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
- "github.com/cloudflare/cloudflared/websocket"
- )
- const (
- lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
- LogFieldConnIndex = "connIndex"
- MaxGracePeriod = time.Minute * 3
- MaxConcurrentStreams = math.MaxUint32
- contentTypeHeader = "content-type"
- sseContentType = "text/event-stream"
- grpcContentType = "application/grpc"
- )
- var (
- switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
- flushableContentTypes = []string{sseContentType, grpcContentType}
- )
- // TunnelConnection represents the connection to the edge.
- // The Serve method is provided to allow clients to handle any errors from the connection encountered during
- // processing of the connection. Cancelling of the context provided to Serve will close the connection.
- type TunnelConnection interface {
- Serve(ctx context.Context) error
- }
- type Orchestrator interface {
- UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
- GetConfigJSON() ([]byte, error)
- GetOriginProxy() (OriginProxy, error)
- }
- type TunnelProperties struct {
- Credentials Credentials
- Client pogs.ClientInfo
- QuickTunnelUrl string
- }
- // Credentials are stored in the credentials file and contain all info needed to run a tunnel.
- type Credentials struct {
- AccountTag string
- TunnelSecret []byte
- TunnelID uuid.UUID
- }
- func (c *Credentials) Auth() pogs.TunnelAuth {
- return pogs.TunnelAuth{
- AccountTag: c.AccountTag,
- TunnelSecret: c.TunnelSecret,
- }
- }
- // TunnelToken are Credentials but encoded with custom fields namings.
- type TunnelToken struct {
- AccountTag string `json:"a"`
- TunnelSecret []byte `json:"s"`
- TunnelID uuid.UUID `json:"t"`
- }
- func (t TunnelToken) Credentials() Credentials {
- return Credentials{
- AccountTag: t.AccountTag,
- TunnelSecret: t.TunnelSecret,
- TunnelID: t.TunnelID,
- }
- }
- func (t TunnelToken) Encode() (string, error) {
- val, err := json.Marshal(t)
- if err != nil {
- return "", errors.Wrap(err, "could not JSON encode token")
- }
- return base64.StdEncoding.EncodeToString(val), nil
- }
- type ClassicTunnelProperties struct {
- Hostname string
- OriginCert []byte
- // feature-flag to use new edge reconnect tokens
- UseReconnectToken bool
- }
- // Type indicates the connection type of the connection.
- type Type int
- const (
- TypeWebsocket Type = iota
- TypeTCP
- TypeControlStream
- TypeHTTP
- TypeConfiguration
- )
- // ShouldFlush returns whether this kind of connection should actively flush data
- func (t Type) shouldFlush() bool {
- switch t {
- case TypeWebsocket, TypeTCP, TypeControlStream:
- return true
- default:
- return false
- }
- }
- func (t Type) String() string {
- switch t {
- case TypeWebsocket:
- return "websocket"
- case TypeTCP:
- return "tcp"
- case TypeControlStream:
- return "control stream"
- case TypeHTTP:
- return "http"
- default:
- return fmt.Sprintf("Unknown Type %d", t)
- }
- }
- // OriginProxy is how data flows from cloudflared to the origin services running behind it.
- type OriginProxy interface {
- ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error
- ProxyTCP(ctx context.Context, rwa ReadWriteAcker, req *TCPRequest) error
- }
- // TCPRequest defines the input format needed to perform a TCP proxy.
- type TCPRequest struct {
- Dest string
- CFRay string
- LBProbe bool
- FlowID string
- CfTraceID string
- ConnIndex uint8
- }
- // ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has
- // accepted the connection.
- type ReadWriteAcker interface {
- io.ReadWriter
- AckConnection(tracePropagation string) error
- }
- // HTTPResponseReadWriteAcker is an HTTP implementation of ReadWriteAcker.
- type HTTPResponseReadWriteAcker struct {
- r io.Reader
- w ResponseWriter
- f http.Flusher
- req *http.Request
- }
- // NewHTTPResponseReadWriterAcker returns a new instance of HTTPResponseReadWriteAcker.
- func NewHTTPResponseReadWriterAcker(w ResponseWriter, flusher http.Flusher, req *http.Request) *HTTPResponseReadWriteAcker {
- return &HTTPResponseReadWriteAcker{
- r: req.Body,
- w: w,
- f: flusher,
- req: req,
- }
- }
- func (h *HTTPResponseReadWriteAcker) Read(p []byte) (int, error) {
- return h.r.Read(p)
- }
- func (h *HTTPResponseReadWriteAcker) Write(p []byte) (int, error) {
- n, err := h.w.Write(p)
- if n > 0 {
- h.f.Flush()
- }
- return n, err
- }
- // AckConnection acks an HTTP connection by sending a switch protocols status code that enables the caller to
- // upgrade to streams.
- func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) error {
- resp := &http.Response{
- Status: switchingProtocolText,
- StatusCode: http.StatusSwitchingProtocols,
- ContentLength: -1,
- Header: http.Header{},
- }
- if secWebsocketKey := h.req.Header.Get("Sec-WebSocket-Key"); secWebsocketKey != "" {
- resp.Header = websocket.NewResponseHeader(h.req)
- }
- if tracePropagation != "" {
- resp.Header.Add(tracing.CanonicalCloudflaredTracingHeader, tracePropagation)
- }
- return h.w.WriteRespHeaders(resp.StatusCode, resp.Header)
- }
- // localProxyConnection emulates an incoming connection to cloudflared as a net.Conn.
- // Used when handling a "hijacked" connection from connection.ResponseWriter
- type localProxyConnection struct {
- io.ReadWriteCloser
- }
- func (c *localProxyConnection) Read(b []byte) (int, error) {
- return c.ReadWriteCloser.Read(b)
- }
- func (c *localProxyConnection) Write(b []byte) (int, error) {
- return c.ReadWriteCloser.Write(b)
- }
- func (c *localProxyConnection) Close() error {
- return c.ReadWriteCloser.Close()
- }
- func (c *localProxyConnection) LocalAddr() net.Addr {
- // Unused LocalAddr
- return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
- }
- func (c *localProxyConnection) RemoteAddr() net.Addr {
- // Unused RemoteAddr
- return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
- }
- func (c *localProxyConnection) SetDeadline(t time.Time) error {
- // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
- return nil
- }
- func (c *localProxyConnection) SetReadDeadline(t time.Time) error {
- // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
- return nil
- }
- func (c *localProxyConnection) SetWriteDeadline(t time.Time) error {
- // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
- return nil
- }
- // ResponseWriter is the response path for a request back through cloudflared's tunnel.
- type ResponseWriter interface {
- WriteRespHeaders(status int, header http.Header) error
- AddTrailer(trailerName, trailerValue string)
- http.ResponseWriter
- http.Hijacker
- io.Writer
- }
- type ConnectedFuse interface {
- Connected()
- IsConnected() bool
- }
- // Helper method to let the caller know what content-types should require a flush on every
- // write to a ResponseWriter.
- func shouldFlush(headers http.Header) bool {
- if contentType := headers.Get(contentTypeHeader); contentType != "" {
- contentType = strings.ToLower(contentType)
- for _, c := range flushableContentTypes {
- if strings.HasPrefix(contentType, c) {
- return true
- }
- }
- }
- return false
- }
- func uint8ToString(input uint8) string {
- return strconv.FormatUint(uint64(input), 10)
- }
- func FindCfRayHeader(req *http.Request) string {
- return req.Header.Get("Cf-Ray")
- }
- func IsLBProbeRequest(req *http.Request) bool {
- return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
- }
|