proxy.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. package proxy
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "time"
  9. "github.com/pkg/errors"
  10. pkgerrors "github.com/pkg/errors"
  11. "github.com/rs/zerolog"
  12. "go.opentelemetry.io/otel/attribute"
  13. "go.opentelemetry.io/otel/trace"
  14. cfdflow "github.com/cloudflare/cloudflared/flow"
  15. "github.com/cloudflare/cloudflared/management"
  16. "github.com/cloudflare/cloudflared/carrier"
  17. "github.com/cloudflare/cloudflared/cfio"
  18. "github.com/cloudflare/cloudflared/connection"
  19. "github.com/cloudflare/cloudflared/ingress"
  20. "github.com/cloudflare/cloudflared/stream"
  21. "github.com/cloudflare/cloudflared/tracing"
  22. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  23. )
  24. const (
  25. // TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers.
  26. TagHeaderNamePrefix = "Cf-Warp-Tag-"
  27. trailerHeaderName = "Trailer"
  28. )
  29. // Proxy represents a means to Proxy between cloudflared and the origin services.
  30. type Proxy struct {
  31. ingressRules ingress.Ingress
  32. warpRouting *ingress.WarpRoutingService
  33. tags []pogs.Tag
  34. flowLimiter cfdflow.Limiter
  35. log *zerolog.Logger
  36. }
  37. // NewOriginProxy returns a new instance of the Proxy struct.
  38. func NewOriginProxy(
  39. ingressRules ingress.Ingress,
  40. warpRouting ingress.WarpRoutingConfig,
  41. tags []pogs.Tag,
  42. flowLimiter cfdflow.Limiter,
  43. writeTimeout time.Duration,
  44. log *zerolog.Logger,
  45. ) *Proxy {
  46. proxy := &Proxy{
  47. ingressRules: ingressRules,
  48. tags: tags,
  49. flowLimiter: flowLimiter,
  50. log: log,
  51. }
  52. proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting, writeTimeout)
  53. return proxy
  54. }
  55. func (p *Proxy) applyIngressMiddleware(rule *ingress.Rule, r *http.Request, w connection.ResponseWriter) (error, bool) {
  56. for _, handler := range rule.Handlers {
  57. result, err := handler.Handle(r.Context(), r)
  58. if err != nil {
  59. return errors.Wrap(err, fmt.Sprintf("error while processing middleware handler %s", handler.Name())), false
  60. }
  61. if result.ShouldFilterRequest {
  62. _ = w.WriteRespHeaders(result.StatusCode, nil)
  63. return fmt.Errorf("request filtered by middleware handler (%s) due to: %s", handler.Name(), result.Reason), true
  64. }
  65. }
  66. return nil, true
  67. }
  68. // ProxyHTTP further depends on ingress rules to establish a connection with the origin service. This may be
  69. // a simple roundtrip or a tcp/websocket dial depending on ingres rule setup.
  70. func (p *Proxy) ProxyHTTP(
  71. w connection.ResponseWriter,
  72. tr *tracing.TracedHTTPRequest,
  73. isWebsocket bool,
  74. ) error {
  75. incrementRequests()
  76. defer decrementConcurrentRequests()
  77. req := tr.Request
  78. p.appendTagHeaders(req)
  79. _, ruleSpan := tr.Tracer().Start(req.Context(), "ingress_match",
  80. trace.WithAttributes(attribute.String("req-host", req.Host)))
  81. rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
  82. ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum))
  83. ruleSpan.End()
  84. logger := newHTTPLogger(p.log, tr.ConnIndex, req, ruleNum, rule.Service.String())
  85. logHTTPRequest(&logger, req)
  86. if err, applied := p.applyIngressMiddleware(rule, req, w); err != nil {
  87. if applied {
  88. logRequestError(&logger, err)
  89. return nil
  90. }
  91. return err
  92. }
  93. switch originProxy := rule.Service.(type) {
  94. case ingress.HTTPOriginProxy:
  95. if err := p.proxyHTTPRequest(
  96. w,
  97. tr,
  98. originProxy,
  99. isWebsocket,
  100. rule.Config.DisableChunkedEncoding,
  101. &logger,
  102. ); err != nil {
  103. logRequestError(&logger, err)
  104. return err
  105. }
  106. return nil
  107. case ingress.StreamBasedOriginProxy:
  108. dest, err := getDestFromRule(rule, req)
  109. if err != nil {
  110. return err
  111. }
  112. flusher, ok := w.(http.Flusher)
  113. if !ok {
  114. return fmt.Errorf("response writer is not a flusher")
  115. }
  116. rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
  117. logger := logger.With().Str(logFieldDestAddr, dest).Logger()
  118. if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, &logger); err != nil {
  119. logRequestError(&logger, err)
  120. return err
  121. }
  122. return nil
  123. case ingress.HTTPLocalProxy:
  124. p.proxyLocalRequest(originProxy, w, req, isWebsocket)
  125. return nil
  126. default:
  127. return fmt.Errorf("Unrecognized service: %s, %t", rule.Service, originProxy)
  128. }
  129. }
  130. // ProxyTCP proxies to a TCP connection between the origin service and cloudflared.
  131. func (p *Proxy) ProxyTCP(
  132. ctx context.Context,
  133. rwa connection.ReadWriteAcker,
  134. req *connection.TCPRequest,
  135. ) error {
  136. incrementTCPRequests()
  137. defer decrementTCPConcurrentRequests()
  138. if p.warpRouting == nil {
  139. err := errors.New(`cloudflared received a request from WARP client, but your configuration has disabled ingress from WARP clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml`)
  140. p.log.Error().Msg(err.Error())
  141. return err
  142. }
  143. logger := newTCPLogger(p.log, req)
  144. // Try to start a new flow
  145. if err := p.flowLimiter.Acquire(management.TCP.String()); err != nil {
  146. logger.Warn().Msg("Too many concurrent flows being handled, rejecting tcp proxy")
  147. return pkgerrors.Wrap(err, "failed to start tcp flow due to rate limiting")
  148. }
  149. defer p.flowLimiter.Release()
  150. serveCtx, cancel := context.WithCancel(ctx)
  151. defer cancel()
  152. tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, &logger)
  153. logger.Debug().Msg("tcp proxy stream started")
  154. if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, &logger); err != nil {
  155. logRequestError(&logger, err)
  156. return err
  157. }
  158. logger.Debug().Msg("tcp proxy stream finished successfully")
  159. return nil
  160. }
  161. // ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
  162. func (p *Proxy) proxyHTTPRequest(
  163. w connection.ResponseWriter,
  164. tr *tracing.TracedHTTPRequest,
  165. httpService ingress.HTTPOriginProxy,
  166. isWebsocket bool,
  167. disableChunkedEncoding bool,
  168. logger *zerolog.Logger,
  169. ) error {
  170. roundTripReq := tr.Request
  171. if isWebsocket {
  172. roundTripReq = tr.Clone(tr.Request.Context())
  173. roundTripReq.Header.Set("Connection", "Upgrade")
  174. roundTripReq.Header.Set("Upgrade", "websocket")
  175. roundTripReq.Header.Set("Sec-Websocket-Version", "13")
  176. roundTripReq.ContentLength = 0
  177. roundTripReq.Body = nil
  178. } else {
  179. // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
  180. if disableChunkedEncoding {
  181. roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
  182. cLength, err := strconv.Atoi(tr.Request.Header.Get("Content-Length"))
  183. if err == nil {
  184. roundTripReq.ContentLength = int64(cLength)
  185. }
  186. }
  187. // Request origin to keep connection alive to improve performance
  188. roundTripReq.Header.Set("Connection", "keep-alive")
  189. }
  190. // Set the User-Agent as an empty string if not provided to avoid inserting golang default UA
  191. if roundTripReq.Header.Get("User-Agent") == "" {
  192. roundTripReq.Header.Set("User-Agent", "")
  193. }
  194. _, ttfbSpan := tr.Tracer().Start(tr.Context(), "ttfb_origin")
  195. resp, err := httpService.RoundTrip(roundTripReq)
  196. if err != nil {
  197. tracing.EndWithErrorStatus(ttfbSpan, err)
  198. if err := roundTripReq.Context().Err(); err != nil {
  199. return errors.Wrap(err, "Incoming request ended abruptly")
  200. }
  201. return errors.Wrap(err, "Unable to reach the origin service. The service may be down or it may not be responding to traffic from cloudflared")
  202. }
  203. tracing.EndWithStatusCode(ttfbSpan, resp.StatusCode)
  204. defer resp.Body.Close()
  205. headers := make(http.Header, len(resp.Header))
  206. // copy headers
  207. for k, v := range resp.Header {
  208. headers[k] = v
  209. }
  210. // Add spans to response header (if available)
  211. tr.AddSpans(headers)
  212. err = w.WriteRespHeaders(resp.StatusCode, headers)
  213. if err != nil {
  214. return errors.Wrap(err, "Error writing response header")
  215. }
  216. if resp.StatusCode == http.StatusSwitchingProtocols {
  217. rwc, ok := resp.Body.(io.ReadWriteCloser)
  218. if !ok {
  219. return errors.New("internal error: unsupported connection type")
  220. }
  221. defer rwc.Close()
  222. eyeballStream := &bidirectionalStream{
  223. writer: w,
  224. reader: tr.Request.Body,
  225. }
  226. stream.Pipe(eyeballStream, rwc, logger)
  227. return nil
  228. }
  229. if _, err = cfio.Copy(w, resp.Body); err != nil {
  230. return err
  231. }
  232. // copy trailers
  233. copyTrailers(w, resp)
  234. logOriginHTTPResponse(logger, resp)
  235. return nil
  236. }
  237. // proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
  238. // ingress rule.
  239. // connectedLogger is used to log when the connection is acknowledged
  240. func (p *Proxy) proxyStream(
  241. tr *tracing.TracedContext,
  242. rwa connection.ReadWriteAcker,
  243. dest string,
  244. connectionProxy ingress.StreamBasedOriginProxy,
  245. logger *zerolog.Logger,
  246. ) error {
  247. ctx := tr.Context
  248. _, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
  249. start := time.Now()
  250. originConn, err := connectionProxy.EstablishConnection(ctx, dest, logger)
  251. if err != nil {
  252. connectStreamErrors.Inc()
  253. tracing.EndWithErrorStatus(connectSpan, err)
  254. return err
  255. }
  256. connectSpan.End()
  257. defer originConn.Close()
  258. logger.Debug().Msg("origin connection established")
  259. encodedSpans := tr.GetSpans()
  260. if err := rwa.AckConnection(encodedSpans); err != nil {
  261. connectStreamErrors.Inc()
  262. return err
  263. }
  264. connectLatency.Observe(float64(time.Since(start).Milliseconds()))
  265. logger.Debug().Msg("proxy stream acknowledged")
  266. originConn.Stream(ctx, rwa, logger)
  267. return nil
  268. }
  269. func (p *Proxy) proxyLocalRequest(proxy ingress.HTTPLocalProxy, w connection.ResponseWriter, req *http.Request, isWebsocket bool) {
  270. if isWebsocket {
  271. // These headers are added since they are stripped off during an eyeball request to origintunneld, but they
  272. // are required during the Handshake process of a WebSocket request.
  273. req.Header.Set("Connection", "Upgrade")
  274. req.Header.Set("Upgrade", "websocket")
  275. req.Header.Set("Sec-Websocket-Version", "13")
  276. }
  277. proxy.ServeHTTP(w, req)
  278. }
  279. type bidirectionalStream struct {
  280. reader io.Reader
  281. writer io.Writer
  282. }
  283. func (wr *bidirectionalStream) Read(p []byte) (n int, err error) {
  284. return wr.reader.Read(p)
  285. }
  286. func (wr *bidirectionalStream) Write(p []byte) (n int, err error) {
  287. return wr.writer.Write(p)
  288. }
  289. func (p *Proxy) appendTagHeaders(r *http.Request) {
  290. for _, tag := range p.tags {
  291. r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
  292. }
  293. }
  294. func copyTrailers(w connection.ResponseWriter, response *http.Response) {
  295. for trailerHeader, trailerValues := range response.Trailer {
  296. for _, trailerValue := range trailerValues {
  297. w.AddTrailer(trailerHeader, trailerValue)
  298. }
  299. }
  300. }
  301. func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {
  302. switch rule.Service.String() {
  303. case ingress.ServiceBastion:
  304. return carrier.ResolveBastionDest(req)
  305. default:
  306. return rule.Service.String(), nil
  307. }
  308. }