proxy.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package proxy
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "strconv"
  8. "time"
  9. "github.com/pkg/errors"
  10. "github.com/rs/zerolog"
  11. "go.opentelemetry.io/otel/attribute"
  12. "go.opentelemetry.io/otel/trace"
  13. "github.com/cloudflare/cloudflared/carrier"
  14. "github.com/cloudflare/cloudflared/cfio"
  15. "github.com/cloudflare/cloudflared/connection"
  16. "github.com/cloudflare/cloudflared/ingress"
  17. "github.com/cloudflare/cloudflared/stream"
  18. "github.com/cloudflare/cloudflared/tracing"
  19. "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  20. )
  21. const (
  22. // TagHeaderNamePrefix indicates a Cloudflared Warp Tag prefix that gets appended for warp traffic stream headers.
  23. TagHeaderNamePrefix = "Cf-Warp-Tag-"
  24. trailerHeaderName = "Trailer"
  25. )
  26. // Proxy represents a means to Proxy between cloudflared and the origin services.
  27. type Proxy struct {
  28. ingressRules ingress.Ingress
  29. warpRouting *ingress.WarpRoutingService
  30. management *ingress.ManagementService
  31. tags []pogs.Tag
  32. log *zerolog.Logger
  33. }
  34. // NewOriginProxy returns a new instance of the Proxy struct.
  35. func NewOriginProxy(
  36. ingressRules ingress.Ingress,
  37. warpRouting ingress.WarpRoutingConfig,
  38. tags []pogs.Tag,
  39. writeTimeout time.Duration,
  40. log *zerolog.Logger,
  41. ) *Proxy {
  42. proxy := &Proxy{
  43. ingressRules: ingressRules,
  44. tags: tags,
  45. log: log,
  46. }
  47. proxy.warpRouting = ingress.NewWarpRoutingService(warpRouting, writeTimeout)
  48. return proxy
  49. }
  50. func (p *Proxy) applyIngressMiddleware(rule *ingress.Rule, r *http.Request, w connection.ResponseWriter) (error, bool) {
  51. for _, handler := range rule.Handlers {
  52. result, err := handler.Handle(r.Context(), r)
  53. if err != nil {
  54. return errors.Wrap(err, fmt.Sprintf("error while processing middleware handler %s", handler.Name())), false
  55. }
  56. if result.ShouldFilterRequest {
  57. w.WriteRespHeaders(result.StatusCode, nil)
  58. return fmt.Errorf("request filtered by middleware handler (%s) due to: %s", handler.Name(), result.Reason), true
  59. }
  60. }
  61. return nil, true
  62. }
  63. // ProxyHTTP further depends on ingress rules to establish a connection with the origin service. This may be
  64. // a simple roundtrip or a tcp/websocket dial depending on ingres rule setup.
  65. func (p *Proxy) ProxyHTTP(
  66. w connection.ResponseWriter,
  67. tr *tracing.TracedHTTPRequest,
  68. isWebsocket bool,
  69. ) error {
  70. incrementRequests()
  71. defer decrementConcurrentRequests()
  72. req := tr.Request
  73. p.appendTagHeaders(req)
  74. _, ruleSpan := tr.Tracer().Start(req.Context(), "ingress_match",
  75. trace.WithAttributes(attribute.String("req-host", req.Host)))
  76. rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
  77. ruleSpan.SetAttributes(attribute.Int("rule-num", ruleNum))
  78. ruleSpan.End()
  79. logger := newHTTPLogger(p.log, tr.ConnIndex, req, ruleNum, rule.Service.String())
  80. logHTTPRequest(&logger, req)
  81. if err, applied := p.applyIngressMiddleware(rule, req, w); err != nil {
  82. if applied {
  83. logRequestError(&logger, err)
  84. return nil
  85. }
  86. return err
  87. }
  88. switch originProxy := rule.Service.(type) {
  89. case ingress.HTTPOriginProxy:
  90. if err := p.proxyHTTPRequest(
  91. w,
  92. tr,
  93. originProxy,
  94. isWebsocket,
  95. rule.Config.DisableChunkedEncoding,
  96. &logger,
  97. ); err != nil {
  98. logRequestError(&logger, err)
  99. return err
  100. }
  101. return nil
  102. case ingress.StreamBasedOriginProxy:
  103. dest, err := getDestFromRule(rule, req)
  104. if err != nil {
  105. return err
  106. }
  107. flusher, ok := w.(http.Flusher)
  108. if !ok {
  109. return fmt.Errorf("response writer is not a flusher")
  110. }
  111. rws := connection.NewHTTPResponseReadWriterAcker(w, flusher, req)
  112. logger := logger.With().Str(logFieldDestAddr, dest).Logger()
  113. if err := p.proxyStream(tr.ToTracedContext(), rws, dest, originProxy, &logger); err != nil {
  114. logRequestError(&logger, err)
  115. return err
  116. }
  117. return nil
  118. case ingress.HTTPLocalProxy:
  119. p.proxyLocalRequest(originProxy, w, req, isWebsocket)
  120. return nil
  121. default:
  122. return fmt.Errorf("Unrecognized service: %s, %t", rule.Service, originProxy)
  123. }
  124. }
  125. // ProxyTCP proxies to a TCP connection between the origin service and cloudflared.
  126. func (p *Proxy) ProxyTCP(
  127. ctx context.Context,
  128. rwa connection.ReadWriteAcker,
  129. req *connection.TCPRequest,
  130. ) error {
  131. incrementTCPRequests()
  132. defer decrementTCPConcurrentRequests()
  133. if p.warpRouting == nil {
  134. err := errors.New(`cloudflared received a request from WARP client, but your configuration has disabled ingress from WARP clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml`)
  135. p.log.Error().Msg(err.Error())
  136. return err
  137. }
  138. serveCtx, cancel := context.WithCancel(ctx)
  139. defer cancel()
  140. logger := newTCPLogger(p.log, req)
  141. tracedCtx := tracing.NewTracedContext(serveCtx, req.CfTraceID, &logger)
  142. logger.Debug().Msg("tcp proxy stream started")
  143. if err := p.proxyStream(tracedCtx, rwa, req.Dest, p.warpRouting.Proxy, &logger); err != nil {
  144. logRequestError(&logger, err)
  145. return err
  146. }
  147. logger.Debug().Msg("tcp proxy stream finished successfully")
  148. return nil
  149. }
  150. // ProxyHTTPRequest proxies requests of underlying type http and websocket to the origin service.
  151. func (p *Proxy) proxyHTTPRequest(
  152. w connection.ResponseWriter,
  153. tr *tracing.TracedHTTPRequest,
  154. httpService ingress.HTTPOriginProxy,
  155. isWebsocket bool,
  156. disableChunkedEncoding bool,
  157. logger *zerolog.Logger,
  158. ) error {
  159. roundTripReq := tr.Request
  160. if isWebsocket {
  161. roundTripReq = tr.Clone(tr.Request.Context())
  162. roundTripReq.Header.Set("Connection", "Upgrade")
  163. roundTripReq.Header.Set("Upgrade", "websocket")
  164. roundTripReq.Header.Set("Sec-Websocket-Version", "13")
  165. roundTripReq.ContentLength = 0
  166. roundTripReq.Body = nil
  167. } else {
  168. // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
  169. if disableChunkedEncoding {
  170. roundTripReq.TransferEncoding = []string{"gzip", "deflate"}
  171. cLength, err := strconv.Atoi(tr.Request.Header.Get("Content-Length"))
  172. if err == nil {
  173. roundTripReq.ContentLength = int64(cLength)
  174. }
  175. }
  176. // Request origin to keep connection alive to improve performance
  177. roundTripReq.Header.Set("Connection", "keep-alive")
  178. }
  179. // Set the User-Agent as an empty string if not provided to avoid inserting golang default UA
  180. if roundTripReq.Header.Get("User-Agent") == "" {
  181. roundTripReq.Header.Set("User-Agent", "")
  182. }
  183. _, ttfbSpan := tr.Tracer().Start(tr.Context(), "ttfb_origin")
  184. resp, err := httpService.RoundTrip(roundTripReq)
  185. if err != nil {
  186. tracing.EndWithErrorStatus(ttfbSpan, err)
  187. if err := roundTripReq.Context().Err(); err != nil {
  188. return errors.Wrap(err, "Incoming request ended abruptly")
  189. }
  190. return errors.Wrap(err, "Unable to reach the origin service. The service may be down or it may not be responding to traffic from cloudflared")
  191. }
  192. tracing.EndWithStatusCode(ttfbSpan, resp.StatusCode)
  193. defer resp.Body.Close()
  194. headers := make(http.Header, len(resp.Header))
  195. // copy headers
  196. for k, v := range resp.Header {
  197. headers[k] = v
  198. }
  199. // Add spans to response header (if available)
  200. tr.AddSpans(headers)
  201. err = w.WriteRespHeaders(resp.StatusCode, headers)
  202. if err != nil {
  203. return errors.Wrap(err, "Error writing response header")
  204. }
  205. if resp.StatusCode == http.StatusSwitchingProtocols {
  206. rwc, ok := resp.Body.(io.ReadWriteCloser)
  207. if !ok {
  208. return errors.New("internal error: unsupported connection type")
  209. }
  210. defer rwc.Close()
  211. eyeballStream := &bidirectionalStream{
  212. writer: w,
  213. reader: tr.Request.Body,
  214. }
  215. stream.Pipe(eyeballStream, rwc, logger)
  216. return nil
  217. }
  218. if _, err = cfio.Copy(w, resp.Body); err != nil {
  219. return err
  220. }
  221. // copy trailers
  222. copyTrailers(w, resp)
  223. logOriginHTTPResponse(logger, resp)
  224. return nil
  225. }
  226. // proxyStream proxies type TCP and other underlying types if the connection is defined as a stream oriented
  227. // ingress rule.
  228. // connectedLogger is used to log when the connection is acknowledged
  229. func (p *Proxy) proxyStream(
  230. tr *tracing.TracedContext,
  231. rwa connection.ReadWriteAcker,
  232. dest string,
  233. connectionProxy ingress.StreamBasedOriginProxy,
  234. logger *zerolog.Logger,
  235. ) error {
  236. ctx := tr.Context
  237. _, connectSpan := tr.Tracer().Start(ctx, "stream-connect")
  238. start := time.Now()
  239. originConn, err := connectionProxy.EstablishConnection(ctx, dest, logger)
  240. if err != nil {
  241. connectStreamErrors.Inc()
  242. tracing.EndWithErrorStatus(connectSpan, err)
  243. return err
  244. }
  245. connectSpan.End()
  246. defer originConn.Close()
  247. logger.Debug().Msg("origin connection established")
  248. encodedSpans := tr.GetSpans()
  249. if err := rwa.AckConnection(encodedSpans); err != nil {
  250. connectStreamErrors.Inc()
  251. return err
  252. }
  253. connectLatency.Observe(float64(time.Since(start).Milliseconds()))
  254. logger.Debug().Msg("proxy stream acknowledged")
  255. originConn.Stream(ctx, rwa, logger)
  256. return nil
  257. }
  258. func (p *Proxy) proxyLocalRequest(proxy ingress.HTTPLocalProxy, w connection.ResponseWriter, req *http.Request, isWebsocket bool) {
  259. if isWebsocket {
  260. // These headers are added since they are stripped off during an eyeball request to origintunneld, but they
  261. // are required during the Handshake process of a WebSocket request.
  262. req.Header.Set("Connection", "Upgrade")
  263. req.Header.Set("Upgrade", "websocket")
  264. req.Header.Set("Sec-Websocket-Version", "13")
  265. }
  266. proxy.ServeHTTP(w, req)
  267. }
  268. type bidirectionalStream struct {
  269. reader io.Reader
  270. writer io.Writer
  271. }
  272. func (wr *bidirectionalStream) Read(p []byte) (n int, err error) {
  273. return wr.reader.Read(p)
  274. }
  275. func (wr *bidirectionalStream) Write(p []byte) (n int, err error) {
  276. return wr.writer.Write(p)
  277. }
  278. func (p *Proxy) appendTagHeaders(r *http.Request) {
  279. for _, tag := range p.tags {
  280. r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
  281. }
  282. }
  283. func copyTrailers(w connection.ResponseWriter, response *http.Response) {
  284. for trailerHeader, trailerValues := range response.Trailer {
  285. for _, trailerValue := range trailerValues {
  286. w.AddTrailer(trailerHeader, trailerValue)
  287. }
  288. }
  289. }
  290. func getDestFromRule(rule *ingress.Rule, req *http.Request) (string, error) {
  291. switch rule.Service.String() {
  292. case ingress.ServiceBastion:
  293. return carrier.ResolveBastionDest(req)
  294. default:
  295. return rule.Service.String(), nil
  296. }
  297. }