connection.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package connection
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/google/uuid"
  14. "github.com/pkg/errors"
  15. "github.com/cloudflare/cloudflared/tracing"
  16. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  17. "github.com/cloudflare/cloudflared/websocket"
  18. )
  19. const (
  20. lbProbeUserAgentPrefix = "Mozilla/5.0 (compatible; Cloudflare-Traffic-Manager/1.0; +https://www.cloudflare.com/traffic-manager/;"
  21. LogFieldConnIndex = "connIndex"
  22. MaxGracePeriod = time.Minute * 3
  23. MaxConcurrentStreams = math.MaxUint32
  24. contentTypeHeader = "content-type"
  25. sseContentType = "text/event-stream"
  26. grpcContentType = "application/grpc"
  27. )
  28. var (
  29. switchingProtocolText = fmt.Sprintf("%d %s", http.StatusSwitchingProtocols, http.StatusText(http.StatusSwitchingProtocols))
  30. flushableContentTypes = []string{sseContentType, grpcContentType}
  31. )
  32. // TunnelConnection represents the connection to the edge.
  33. // The Serve method is provided to allow clients to handle any errors from the connection encountered during
  34. // processing of the connection. Cancelling of the context provided to Serve will close the connection.
  35. type TunnelConnection interface {
  36. Serve(ctx context.Context) error
  37. }
  38. type Orchestrator interface {
  39. UpdateConfig(version int32, config []byte) *pogs.UpdateConfigurationResponse
  40. GetConfigJSON() ([]byte, error)
  41. GetOriginProxy() (OriginProxy, error)
  42. }
  43. type TunnelProperties struct {
  44. Credentials Credentials
  45. Client pogs.ClientInfo
  46. QuickTunnelUrl string
  47. }
  48. // Credentials are stored in the credentials file and contain all info needed to run a tunnel.
  49. type Credentials struct {
  50. AccountTag string
  51. TunnelSecret []byte
  52. TunnelID uuid.UUID
  53. }
  54. func (c *Credentials) Auth() pogs.TunnelAuth {
  55. return pogs.TunnelAuth{
  56. AccountTag: c.AccountTag,
  57. TunnelSecret: c.TunnelSecret,
  58. }
  59. }
  60. // TunnelToken are Credentials but encoded with custom fields namings.
  61. type TunnelToken struct {
  62. AccountTag string `json:"a"`
  63. TunnelSecret []byte `json:"s"`
  64. TunnelID uuid.UUID `json:"t"`
  65. }
  66. func (t TunnelToken) Credentials() Credentials {
  67. return Credentials{
  68. AccountTag: t.AccountTag,
  69. TunnelSecret: t.TunnelSecret,
  70. TunnelID: t.TunnelID,
  71. }
  72. }
  73. func (t TunnelToken) Encode() (string, error) {
  74. val, err := json.Marshal(t)
  75. if err != nil {
  76. return "", errors.Wrap(err, "could not JSON encode token")
  77. }
  78. return base64.StdEncoding.EncodeToString(val), nil
  79. }
  80. type ClassicTunnelProperties struct {
  81. Hostname string
  82. OriginCert []byte
  83. // feature-flag to use new edge reconnect tokens
  84. UseReconnectToken bool
  85. }
  86. // Type indicates the connection type of the connection.
  87. type Type int
  88. const (
  89. TypeWebsocket Type = iota
  90. TypeTCP
  91. TypeControlStream
  92. TypeHTTP
  93. TypeConfiguration
  94. )
  95. // ShouldFlush returns whether this kind of connection should actively flush data
  96. func (t Type) shouldFlush() bool {
  97. switch t {
  98. case TypeWebsocket, TypeTCP, TypeControlStream:
  99. return true
  100. default:
  101. return false
  102. }
  103. }
  104. func (t Type) String() string {
  105. switch t {
  106. case TypeWebsocket:
  107. return "websocket"
  108. case TypeTCP:
  109. return "tcp"
  110. case TypeControlStream:
  111. return "control stream"
  112. case TypeHTTP:
  113. return "http"
  114. default:
  115. return fmt.Sprintf("Unknown Type %d", t)
  116. }
  117. }
  118. // OriginProxy is how data flows from cloudflared to the origin services running behind it.
  119. type OriginProxy interface {
  120. ProxyHTTP(w ResponseWriter, tr *tracing.TracedHTTPRequest, isWebsocket bool) error
  121. ProxyTCP(ctx context.Context, rwa ReadWriteAcker, req *TCPRequest) error
  122. }
  123. // TCPRequest defines the input format needed to perform a TCP proxy.
  124. type TCPRequest struct {
  125. Dest string
  126. CFRay string
  127. LBProbe bool
  128. FlowID string
  129. CfTraceID string
  130. ConnIndex uint8
  131. }
  132. // ReadWriteAcker is a readwriter with the ability to Acknowledge to the downstream (edge) that the origin has
  133. // accepted the connection.
  134. type ReadWriteAcker interface {
  135. io.ReadWriter
  136. AckConnection(tracePropagation string) error
  137. }
  138. // HTTPResponseReadWriteAcker is an HTTP implementation of ReadWriteAcker.
  139. type HTTPResponseReadWriteAcker struct {
  140. r io.Reader
  141. w ResponseWriter
  142. f http.Flusher
  143. req *http.Request
  144. }
  145. // NewHTTPResponseReadWriterAcker returns a new instance of HTTPResponseReadWriteAcker.
  146. func NewHTTPResponseReadWriterAcker(w ResponseWriter, flusher http.Flusher, req *http.Request) *HTTPResponseReadWriteAcker {
  147. return &HTTPResponseReadWriteAcker{
  148. r: req.Body,
  149. w: w,
  150. f: flusher,
  151. req: req,
  152. }
  153. }
  154. func (h *HTTPResponseReadWriteAcker) Read(p []byte) (int, error) {
  155. return h.r.Read(p)
  156. }
  157. func (h *HTTPResponseReadWriteAcker) Write(p []byte) (int, error) {
  158. n, err := h.w.Write(p)
  159. if n > 0 {
  160. h.f.Flush()
  161. }
  162. return n, err
  163. }
  164. // AckConnection acks an HTTP connection by sending a switch protocols status code that enables the caller to
  165. // upgrade to streams.
  166. func (h *HTTPResponseReadWriteAcker) AckConnection(tracePropagation string) error {
  167. resp := &http.Response{
  168. Status: switchingProtocolText,
  169. StatusCode: http.StatusSwitchingProtocols,
  170. ContentLength: -1,
  171. Header: http.Header{},
  172. }
  173. if secWebsocketKey := h.req.Header.Get("Sec-WebSocket-Key"); secWebsocketKey != "" {
  174. resp.Header = websocket.NewResponseHeader(h.req)
  175. }
  176. if tracePropagation != "" {
  177. resp.Header.Add(tracing.CanonicalCloudflaredTracingHeader, tracePropagation)
  178. }
  179. return h.w.WriteRespHeaders(resp.StatusCode, resp.Header)
  180. }
  181. // localProxyConnection emulates an incoming connection to cloudflared as a net.Conn.
  182. // Used when handling a "hijacked" connection from connection.ResponseWriter
  183. type localProxyConnection struct {
  184. io.ReadWriteCloser
  185. }
  186. func (c *localProxyConnection) Read(b []byte) (int, error) {
  187. return c.ReadWriteCloser.Read(b)
  188. }
  189. func (c *localProxyConnection) Write(b []byte) (int, error) {
  190. return c.ReadWriteCloser.Write(b)
  191. }
  192. func (c *localProxyConnection) Close() error {
  193. return c.ReadWriteCloser.Close()
  194. }
  195. func (c *localProxyConnection) LocalAddr() net.Addr {
  196. // Unused LocalAddr
  197. return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
  198. }
  199. func (c *localProxyConnection) RemoteAddr() net.Addr {
  200. // Unused RemoteAddr
  201. return &net.TCPAddr{IP: net.IPv6loopback, Port: 0, Zone: ""}
  202. }
  203. func (c *localProxyConnection) SetDeadline(t time.Time) error {
  204. // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
  205. return nil
  206. }
  207. func (c *localProxyConnection) SetReadDeadline(t time.Time) error {
  208. // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
  209. return nil
  210. }
  211. func (c *localProxyConnection) SetWriteDeadline(t time.Time) error {
  212. // ignored since we can't set the read/write Deadlines for the tunnel back to origintunneld
  213. return nil
  214. }
  215. // ResponseWriter is the response path for a request back through cloudflared's tunnel.
  216. type ResponseWriter interface {
  217. WriteRespHeaders(status int, header http.Header) error
  218. AddTrailer(trailerName, trailerValue string)
  219. http.ResponseWriter
  220. http.Hijacker
  221. io.Writer
  222. }
  223. type ConnectedFuse interface {
  224. Connected()
  225. IsConnected() bool
  226. }
  227. // Helper method to let the caller know what content-types should require a flush on every
  228. // write to a ResponseWriter.
  229. func shouldFlush(headers http.Header) bool {
  230. if contentType := headers.Get(contentTypeHeader); contentType != "" {
  231. contentType = strings.ToLower(contentType)
  232. for _, c := range flushableContentTypes {
  233. if strings.HasPrefix(contentType, c) {
  234. return true
  235. }
  236. }
  237. }
  238. return false
  239. }
  240. func uint8ToString(input uint8) string {
  241. return strconv.FormatUint(uint64(input), 10)
  242. }
  243. func FindCfRayHeader(req *http.Request) string {
  244. return req.Header.Get("Cf-Ray")
  245. }
  246. func IsLBProbeRequest(req *http.Request) bool {
  247. return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
  248. }