proxy.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package origin
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "github.com/pkg/errors"
  11. "github.com/rs/zerolog"
  12. "github.com/cloudflare/cloudflared/connection"
  13. "github.com/cloudflare/cloudflared/ingress"
  14. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  15. )
  16. const (
  17. TagHeaderNamePrefix = "Cf-Warp-Tag-"
  18. )
  19. type proxy struct {
  20. ingressRules ingress.Ingress
  21. warpRouting *ingress.WarpRoutingService
  22. tags []tunnelpogs.Tag
  23. log *zerolog.Logger
  24. bufferPool *bufferPool
  25. }
  26. func NewOriginProxy(
  27. ingressRules ingress.Ingress,
  28. warpRouting *ingress.WarpRoutingService,
  29. tags []tunnelpogs.Tag,
  30. log *zerolog.Logger) connection.OriginProxy {
  31. return &proxy{
  32. ingressRules: ingressRules,
  33. warpRouting: warpRouting,
  34. tags: tags,
  35. log: log,
  36. bufferPool: newBufferPool(512 * 1024),
  37. }
  38. }
  39. // Caller is responsible for writing any error to ResponseWriter
  40. func (p *proxy) Proxy(w connection.ResponseWriter, req *http.Request, sourceConnectionType connection.Type) error {
  41. incrementRequests()
  42. defer decrementConcurrentRequests()
  43. cfRay := findCfRayHeader(req)
  44. lbProbe := isLBProbeRequest(req)
  45. serveCtx, cancel := context.WithCancel(req.Context())
  46. defer cancel()
  47. p.appendTagHeaders(req)
  48. if sourceConnectionType == connection.TypeTCP {
  49. if p.warpRouting == nil {
  50. err := errors.New(`cloudflared received a request from WARP client, but your configuration has disabled ingress from WARP clients. To enable this, set "warp-routing:\n\t enabled: true" in your config.yaml`)
  51. p.log.Error().Msg(err.Error())
  52. return err
  53. }
  54. logFields := logFields{
  55. cfRay: cfRay,
  56. lbProbe: lbProbe,
  57. rule: ingress.ServiceWarpRouting,
  58. }
  59. if err := p.proxyStreamRequest(serveCtx, w, req, sourceConnectionType, p.warpRouting.Proxy, logFields); err != nil {
  60. p.logRequestError(err, cfRay, ingress.ServiceWarpRouting)
  61. return err
  62. }
  63. return nil
  64. }
  65. rule, ruleNum := p.ingressRules.FindMatchingRule(req.Host, req.URL.Path)
  66. logFields := logFields{
  67. cfRay: cfRay,
  68. lbProbe: lbProbe,
  69. rule: ruleNum,
  70. }
  71. p.logRequest(req, logFields)
  72. if sourceConnectionType == connection.TypeHTTP {
  73. if err := p.proxyHTTPRequest(w, req, rule, logFields); err != nil {
  74. p.logRequestError(err, cfRay, ruleNum)
  75. return err
  76. }
  77. return nil
  78. }
  79. connectionProxy, ok := rule.Service.(ingress.StreamBasedOriginProxy)
  80. if !ok {
  81. p.log.Error().Msgf("%s is not a connection-oriented service", rule.Service)
  82. return fmt.Errorf("Not a connection-oriented service")
  83. }
  84. if err := p.proxyStreamRequest(serveCtx, w, req, sourceConnectionType, connectionProxy, logFields); err != nil {
  85. p.logRequestError(err, cfRay, ruleNum)
  86. return err
  87. }
  88. return nil
  89. }
  90. func (p *proxy) proxyHTTPRequest(w connection.ResponseWriter, req *http.Request, rule *ingress.Rule, fields logFields) error {
  91. // Support for WSGI Servers by switching transfer encoding from chunked to gzip/deflate
  92. if rule.Config.DisableChunkedEncoding {
  93. req.TransferEncoding = []string{"gzip", "deflate"}
  94. cLength, err := strconv.Atoi(req.Header.Get("Content-Length"))
  95. if err == nil {
  96. req.ContentLength = int64(cLength)
  97. }
  98. }
  99. // Request origin to keep connection alive to improve performance
  100. req.Header.Set("Connection", "keep-alive")
  101. httpService, ok := rule.Service.(ingress.HTTPOriginProxy)
  102. if !ok {
  103. p.log.Error().Msgf("%s is not a http service", rule.Service)
  104. return fmt.Errorf("Not a http service")
  105. }
  106. resp, err := httpService.RoundTrip(req)
  107. if err != nil {
  108. return errors.Wrap(err, "Error proxying request to origin")
  109. }
  110. defer resp.Body.Close()
  111. err = w.WriteRespHeaders(resp.StatusCode, resp.Header)
  112. if err != nil {
  113. return errors.Wrap(err, "Error writing response header")
  114. }
  115. if connection.IsServerSentEvent(resp.Header) {
  116. p.log.Debug().Msg("Detected Server-Side Events from Origin")
  117. p.writeEventStream(w, resp.Body)
  118. } else {
  119. // Use CopyBuffer, because Copy only allocates a 32KiB buffer, and cross-stream
  120. // compression generates dictionary on first write
  121. buf := p.bufferPool.Get()
  122. defer p.bufferPool.Put(buf)
  123. _, _ = io.CopyBuffer(w, resp.Body, buf)
  124. }
  125. p.logOriginResponse(resp, fields)
  126. return nil
  127. }
  128. // proxyStreamRequest first establish a connection with origin, then it writes the status code and headers, and finally it streams data between
  129. // eyeball and origin.
  130. func (p *proxy) proxyStreamRequest(
  131. serveCtx context.Context,
  132. w connection.ResponseWriter,
  133. req *http.Request,
  134. sourceConnectionType connection.Type,
  135. connectionProxy ingress.StreamBasedOriginProxy,
  136. fields logFields,
  137. ) error {
  138. originConn, resp, err := connectionProxy.EstablishConnection(req)
  139. if err != nil {
  140. return err
  141. }
  142. if resp.Body != nil {
  143. defer resp.Body.Close()
  144. }
  145. if err = w.WriteRespHeaders(resp.StatusCode, resp.Header); err != nil {
  146. return err
  147. }
  148. streamCtx, cancel := context.WithCancel(serveCtx)
  149. defer cancel()
  150. go func() {
  151. // streamCtx is done if req is cancelled or if Stream returns
  152. <-streamCtx.Done()
  153. originConn.Close()
  154. }()
  155. eyeballStream := &bidirectionalStream{
  156. writer: w,
  157. reader: req.Body,
  158. }
  159. originConn.Stream(serveCtx, eyeballStream, p.log)
  160. p.logOriginResponse(resp, fields)
  161. return nil
  162. }
  163. type bidirectionalStream struct {
  164. reader io.Reader
  165. writer io.Writer
  166. }
  167. func (wr *bidirectionalStream) Read(p []byte) (n int, err error) {
  168. return wr.reader.Read(p)
  169. }
  170. func (wr *bidirectionalStream) Write(p []byte) (n int, err error) {
  171. return wr.writer.Write(p)
  172. }
  173. func (p *proxy) writeEventStream(w connection.ResponseWriter, respBody io.ReadCloser) {
  174. reader := bufio.NewReader(respBody)
  175. for {
  176. line, err := reader.ReadBytes('\n')
  177. if err != nil {
  178. break
  179. }
  180. _, _ = w.Write(line)
  181. }
  182. }
  183. func (p *proxy) appendTagHeaders(r *http.Request) {
  184. for _, tag := range p.tags {
  185. r.Header.Add(TagHeaderNamePrefix+tag.Name, tag.Value)
  186. }
  187. }
  188. type logFields struct {
  189. cfRay string
  190. lbProbe bool
  191. rule interface{}
  192. }
  193. func (p *proxy) logRequest(r *http.Request, fields logFields) {
  194. if fields.cfRay != "" {
  195. p.log.Debug().Msgf("CF-RAY: %s %s %s %s", fields.cfRay, r.Method, r.URL, r.Proto)
  196. } else if fields.lbProbe {
  197. p.log.Debug().Msgf("CF-RAY: %s Load Balancer health check %s %s %s", fields.cfRay, r.Method, r.URL, r.Proto)
  198. } else {
  199. p.log.Debug().Msgf("All requests should have a CF-RAY header. Please open a support ticket with Cloudflare. %s %s %s ", r.Method, r.URL, r.Proto)
  200. }
  201. p.log.Debug().Msgf("CF-RAY: %s Request Headers %+v", fields.cfRay, r.Header)
  202. p.log.Debug().Msgf("CF-RAY: %s Serving with ingress rule %v", fields.cfRay, fields.rule)
  203. if contentLen := r.ContentLength; contentLen == -1 {
  204. p.log.Debug().Msgf("CF-RAY: %s Request Content length unknown", fields.cfRay)
  205. } else {
  206. p.log.Debug().Msgf("CF-RAY: %s Request content length %d", fields.cfRay, contentLen)
  207. }
  208. }
  209. func (p *proxy) logOriginResponse(resp *http.Response, fields logFields) {
  210. responseByCode.WithLabelValues(strconv.Itoa(resp.StatusCode)).Inc()
  211. if fields.cfRay != "" {
  212. p.log.Debug().Msgf("CF-RAY: %s Status: %s served by ingress %d", fields.cfRay, resp.Status, fields.rule)
  213. } else if fields.lbProbe {
  214. p.log.Debug().Msgf("Response to Load Balancer health check %s", resp.Status)
  215. } else {
  216. p.log.Debug().Msgf("Status: %s served by ingress %v", resp.Status, fields.rule)
  217. }
  218. p.log.Debug().Msgf("CF-RAY: %s Response Headers %+v", fields.cfRay, resp.Header)
  219. if contentLen := resp.ContentLength; contentLen == -1 {
  220. p.log.Debug().Msgf("CF-RAY: %s Response content length unknown", fields.cfRay)
  221. } else {
  222. p.log.Debug().Msgf("CF-RAY: %s Response content length %d", fields.cfRay, contentLen)
  223. }
  224. }
  225. func (p *proxy) logRequestError(err error, cfRay string, rule interface{}) {
  226. requestErrors.Inc()
  227. if cfRay != "" {
  228. p.log.Error().Msgf("CF-RAY: %s Proxying to ingress %v error: %v", cfRay, rule, err)
  229. } else {
  230. p.log.Error().Msgf("Proxying to ingress %v error: %v", rule, err)
  231. }
  232. }
  233. func findCfRayHeader(req *http.Request) string {
  234. return req.Header.Get("Cf-Ray")
  235. }
  236. func isLBProbeRequest(req *http.Request) bool {
  237. return strings.HasPrefix(req.UserAgent(), lbProbeUserAgentPrefix)
  238. }