http2.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package connection
  2. import (
  3. "bufio"
  4. "context"
  5. gojson "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "runtime/debug"
  11. "strings"
  12. "sync"
  13. "github.com/pkg/errors"
  14. "github.com/rs/zerolog"
  15. "golang.org/x/net/http2"
  16. cfdflow "github.com/cloudflare/cloudflared/flow"
  17. "github.com/cloudflare/cloudflared/tracing"
  18. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  19. )
  20. // note: these constants are exported so we can reuse them in the edge-side code
  21. const (
  22. InternalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
  23. InternalTCPProxySrcHeader = "Cf-Cloudflared-Proxy-Src"
  24. WebsocketUpgrade = "websocket"
  25. ControlStreamUpgrade = "control-stream"
  26. ConfigurationUpdate = "update-configuration"
  27. )
  28. var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
  29. // HTTP2Connection represents a net.Conn that uses HTTP2 frames to proxy traffic from the edge to cloudflared on the
  30. // origin.
  31. type HTTP2Connection struct {
  32. conn net.Conn
  33. server *http2.Server
  34. orchestrator Orchestrator
  35. connOptions *tunnelpogs.ConnectionOptions
  36. observer *Observer
  37. connIndex uint8
  38. log *zerolog.Logger
  39. activeRequestsWG sync.WaitGroup
  40. controlStreamHandler ControlStreamHandler
  41. stoppedGracefully bool
  42. controlStreamErr error // result of running control stream handler
  43. }
  44. // NewHTTP2Connection returns a new instance of HTTP2Connection.
  45. func NewHTTP2Connection(
  46. conn net.Conn,
  47. orchestrator Orchestrator,
  48. connOptions *tunnelpogs.ConnectionOptions,
  49. observer *Observer,
  50. connIndex uint8,
  51. controlStreamHandler ControlStreamHandler,
  52. log *zerolog.Logger,
  53. ) *HTTP2Connection {
  54. return &HTTP2Connection{
  55. conn: conn,
  56. server: &http2.Server{
  57. MaxConcurrentStreams: MaxConcurrentStreams,
  58. },
  59. orchestrator: orchestrator,
  60. connOptions: connOptions,
  61. observer: observer,
  62. connIndex: connIndex,
  63. controlStreamHandler: controlStreamHandler,
  64. log: log,
  65. }
  66. }
  67. // Serve serves an HTTP2 server that the edge can talk to.
  68. func (c *HTTP2Connection) Serve(ctx context.Context) error {
  69. go func() {
  70. <-ctx.Done()
  71. c.close()
  72. }()
  73. c.server.ServeConn(c.conn, &http2.ServeConnOpts{
  74. Context: ctx,
  75. Handler: c,
  76. })
  77. switch {
  78. case c.controlStreamHandler.IsStopped():
  79. return nil
  80. case c.controlStreamErr != nil:
  81. return c.controlStreamErr
  82. default:
  83. c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge")
  84. return errEdgeConnectionClosed
  85. }
  86. }
  87. func (c *HTTP2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  88. c.activeRequestsWG.Add(1)
  89. defer c.activeRequestsWG.Done()
  90. connType := determineHTTP2Type(r)
  91. handleMissingRequestParts(connType, r)
  92. respWriter, err := NewHTTP2RespWriter(r, w, connType, c.log)
  93. if err != nil {
  94. c.observer.log.Error().Msg(err.Error())
  95. return
  96. }
  97. originProxy, err := c.orchestrator.GetOriginProxy()
  98. if err != nil {
  99. c.observer.log.Error().Msg(err.Error())
  100. return
  101. }
  102. var requestErr error
  103. switch connType {
  104. case TypeControlStream:
  105. requestErr = c.controlStreamHandler.ServeControlStream(r.Context(), respWriter, c.connOptions, c.orchestrator)
  106. if requestErr != nil {
  107. c.controlStreamErr = requestErr
  108. }
  109. case TypeConfiguration:
  110. requestErr = c.handleConfigurationUpdate(respWriter, r)
  111. case TypeWebsocket, TypeHTTP:
  112. stripWebsocketUpgradeHeader(r)
  113. // Check for tracing on request
  114. tr := tracing.NewTracedHTTPRequest(r, c.connIndex, c.log)
  115. if err := originProxy.ProxyHTTP(respWriter, tr, connType == TypeWebsocket); err != nil {
  116. requestErr = fmt.Errorf("Failed to proxy HTTP: %w", err)
  117. }
  118. case TypeTCP:
  119. host, err := getRequestHost(r)
  120. if err != nil {
  121. requestErr = fmt.Errorf(`cloudflared received a warp-routing request with an empty host value: %w`, err)
  122. break
  123. }
  124. rws := NewHTTPResponseReadWriterAcker(respWriter, respWriter, r)
  125. requestErr = originProxy.ProxyTCP(r.Context(), rws, &TCPRequest{
  126. Dest: host,
  127. CFRay: FindCfRayHeader(r),
  128. LBProbe: IsLBProbeRequest(r),
  129. CfTraceID: r.Header.Get(tracing.TracerContextName),
  130. ConnIndex: c.connIndex,
  131. })
  132. default:
  133. requestErr = fmt.Errorf("Received unknown connection type: %s", connType)
  134. }
  135. if requestErr != nil {
  136. c.log.Error().Err(requestErr).Msg("failed to serve incoming request")
  137. // WriteErrorResponse will return false if status was already written. we need to abort handler.
  138. if !respWriter.WriteErrorResponse(requestErr) {
  139. c.log.Debug().Msg("Handler aborted due to failure to write error response after status already sent")
  140. panic(http.ErrAbortHandler)
  141. }
  142. }
  143. }
  144. // ConfigurationUpdateBody is the representation followed by the edge to send updates to cloudflared.
  145. type ConfigurationUpdateBody struct {
  146. Version int32 `json:"version"`
  147. Config gojson.RawMessage `json:"config"`
  148. }
  149. func (c *HTTP2Connection) handleConfigurationUpdate(respWriter *http2RespWriter, r *http.Request) error {
  150. var configBody ConfigurationUpdateBody
  151. if err := json.NewDecoder(r.Body).Decode(&configBody); err != nil {
  152. return err
  153. }
  154. resp := c.orchestrator.UpdateConfig(configBody.Version, configBody.Config)
  155. bdy, err := json.Marshal(resp)
  156. if err != nil {
  157. return err
  158. }
  159. _, err = respWriter.Write(bdy)
  160. return err
  161. }
  162. func (c *HTTP2Connection) close() {
  163. // Wait for all serve HTTP handlers to return
  164. c.activeRequestsWG.Wait()
  165. c.conn.Close()
  166. }
  167. type http2RespWriter struct {
  168. r io.Reader
  169. w http.ResponseWriter
  170. flusher http.Flusher
  171. shouldFlush bool
  172. statusWritten bool
  173. respHeaders http.Header
  174. hijackedMutex sync.Mutex
  175. hijackedv bool
  176. log *zerolog.Logger
  177. }
  178. func NewHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type, log *zerolog.Logger) (*http2RespWriter, error) {
  179. flusher, isFlusher := w.(http.Flusher)
  180. if !isFlusher {
  181. respWriter := &http2RespWriter{
  182. r: r.Body,
  183. w: w,
  184. log: log,
  185. }
  186. err := fmt.Errorf("%T doesn't implement http.Flusher", w)
  187. respWriter.WriteErrorResponse(err)
  188. return nil, err
  189. }
  190. return &http2RespWriter{
  191. r: r.Body,
  192. w: w,
  193. flusher: flusher,
  194. shouldFlush: connType.shouldFlush(),
  195. respHeaders: make(http.Header),
  196. log: log,
  197. }, nil
  198. }
  199. func (rp *http2RespWriter) AddTrailer(trailerName, trailerValue string) {
  200. if !rp.statusWritten {
  201. rp.log.Warn().Msg("Tried to add Trailer to response before status written. Ignoring...")
  202. return
  203. }
  204. rp.w.Header().Add(http2.TrailerPrefix+trailerName, trailerValue)
  205. }
  206. func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) error {
  207. if rp.hijacked() {
  208. rp.log.Warn().Msg("WriteRespHeaders after hijack")
  209. return nil
  210. }
  211. dest := rp.w.Header()
  212. userHeaders := make(http.Header, len(header))
  213. for name, values := range header {
  214. // lowercase headers for simplicity check
  215. h2name := strings.ToLower(name)
  216. if h2name == "content-length" {
  217. // This header has meaning in HTTP/2 and will be used by the edge,
  218. // so it should be sent *also* as an HTTP/2 response header.
  219. dest[name] = values
  220. }
  221. if h2name == tracing.IntCloudflaredTracingHeader {
  222. // Add cf-int-cloudflared-tracing header outside of serialized userHeaders
  223. dest[tracing.CanonicalCloudflaredTracingHeader] = values
  224. continue
  225. }
  226. if !IsControlResponseHeader(h2name) || IsWebsocketClientHeader(h2name) {
  227. // User headers, on the other hand, must all be serialized so that
  228. // HTTP/2 header validation won't be applied to HTTP/1 header values
  229. userHeaders[name] = values
  230. }
  231. }
  232. // Perform user header serialization and set them in the single header
  233. dest.Set(CanonicalResponseUserHeaders, SerializeHeaders(userHeaders))
  234. rp.setResponseMetaHeader(responseMetaHeaderOrigin)
  235. // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
  236. if status == http.StatusSwitchingProtocols {
  237. status = http.StatusOK
  238. }
  239. rp.w.WriteHeader(status)
  240. if shouldFlush(header) {
  241. rp.shouldFlush = true
  242. }
  243. if rp.shouldFlush {
  244. rp.flusher.Flush()
  245. }
  246. rp.statusWritten = true
  247. return nil
  248. }
  249. func (rp *http2RespWriter) Header() http.Header {
  250. return rp.respHeaders
  251. }
  252. func (rp *http2RespWriter) Flush() {
  253. rp.flusher.Flush()
  254. }
  255. func (rp *http2RespWriter) WriteHeader(status int) {
  256. if rp.hijacked() {
  257. rp.log.Warn().Msg("WriteHeader after hijack")
  258. return
  259. }
  260. _ = rp.WriteRespHeaders(status, rp.respHeaders)
  261. }
  262. func (rp *http2RespWriter) hijacked() bool {
  263. rp.hijackedMutex.Lock()
  264. defer rp.hijackedMutex.Unlock()
  265. return rp.hijackedv
  266. }
  267. func (rp *http2RespWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
  268. if !rp.statusWritten {
  269. return nil, nil, fmt.Errorf("status not yet written before attempting to hijack connection")
  270. }
  271. // Make sure to flush anything left in the buffer before hijacking
  272. if rp.shouldFlush {
  273. rp.flusher.Flush()
  274. }
  275. rp.hijackedMutex.Lock()
  276. defer rp.hijackedMutex.Unlock()
  277. if rp.hijackedv {
  278. return nil, nil, http.ErrHijacked
  279. }
  280. rp.hijackedv = true
  281. conn := &localProxyConnection{rp}
  282. // We return the http2RespWriter here because we want to make sure that we flush after every write
  283. // otherwise the HTTP2 write buffer waits a few seconds before sending.
  284. readWriter := bufio.NewReadWriter(
  285. bufio.NewReader(rp),
  286. bufio.NewWriter(rp),
  287. )
  288. return conn, readWriter, nil
  289. }
  290. func (rp *http2RespWriter) WriteErrorResponse(err error) bool {
  291. if rp.statusWritten {
  292. return false
  293. }
  294. if errors.Is(err, cfdflow.ErrTooManyActiveFlows) {
  295. rp.setResponseMetaHeader(responseMetaHeaderCfdFlowRateLimited)
  296. } else {
  297. rp.setResponseMetaHeader(responseMetaHeaderCfd)
  298. }
  299. rp.w.WriteHeader(http.StatusBadGateway)
  300. rp.statusWritten = true
  301. return true
  302. }
  303. func (rp *http2RespWriter) setResponseMetaHeader(value string) {
  304. rp.w.Header().Set(CanonicalResponseMetaHeader, value)
  305. }
  306. func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
  307. return rp.r.Read(p)
  308. }
  309. func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
  310. defer func() {
  311. // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
  312. // Register a recover routine just in case.
  313. if r := recover(); r != nil {
  314. rp.log.Debug().Msgf("Recover from http2 response writer panic, error %s", debug.Stack())
  315. }
  316. }()
  317. n, err = rp.w.Write(p)
  318. if err == nil && rp.shouldFlush {
  319. rp.flusher.Flush()
  320. }
  321. return n, err
  322. }
  323. func (rp *http2RespWriter) Close() error {
  324. return nil
  325. }
  326. func determineHTTP2Type(r *http.Request) Type {
  327. switch {
  328. case isConfigurationUpdate(r):
  329. return TypeConfiguration
  330. case isWebsocketUpgrade(r):
  331. return TypeWebsocket
  332. case IsTCPStream(r):
  333. return TypeTCP
  334. case isControlStreamUpgrade(r):
  335. return TypeControlStream
  336. default:
  337. return TypeHTTP
  338. }
  339. }
  340. func handleMissingRequestParts(connType Type, r *http.Request) {
  341. if connType == TypeHTTP {
  342. // http library has no guarantees that we receive a filled URL. If not, then we fill it, as we reuse the request
  343. // for proxying. For proxying they should not matter since we control the dialer on every egress proxied.
  344. if len(r.URL.Scheme) == 0 {
  345. r.URL.Scheme = "http"
  346. }
  347. if len(r.URL.Host) == 0 {
  348. r.URL.Host = "localhost:8080"
  349. }
  350. }
  351. }
  352. func isControlStreamUpgrade(r *http.Request) bool {
  353. return r.Header.Get(InternalUpgradeHeader) == ControlStreamUpgrade
  354. }
  355. func isWebsocketUpgrade(r *http.Request) bool {
  356. return r.Header.Get(InternalUpgradeHeader) == WebsocketUpgrade
  357. }
  358. func isConfigurationUpdate(r *http.Request) bool {
  359. return r.Header.Get(InternalUpgradeHeader) == ConfigurationUpdate
  360. }
  361. // IsTCPStream discerns if the connection request needs a tcp stream proxy.
  362. func IsTCPStream(r *http.Request) bool {
  363. return r.Header.Get(InternalTCPProxySrcHeader) != ""
  364. }
  365. func stripWebsocketUpgradeHeader(r *http.Request) {
  366. r.Header.Del(InternalUpgradeHeader)
  367. }
  368. // getRequestHost returns the host of the http.Request.
  369. func getRequestHost(r *http.Request) (string, error) {
  370. if r.Host != "" {
  371. return r.Host, nil
  372. }
  373. if r.URL != nil {
  374. return r.URL.Host, nil
  375. }
  376. return "", errors.New("host not set in incoming request")
  377. }