http2.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. package connection
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "github.com/cloudflare/cloudflared/h2mux"
  12. tunnelpogs "github.com/cloudflare/cloudflared/tunnelrpc/pogs"
  13. "github.com/rs/zerolog"
  14. "golang.org/x/net/http2"
  15. )
  16. const (
  17. internalUpgradeHeader = "Cf-Cloudflared-Proxy-Connection-Upgrade"
  18. tcpStreamHeader = "Cf-Cloudflared-Proxy-Src"
  19. websocketUpgrade = "websocket"
  20. controlStreamUpgrade = "control-stream"
  21. )
  22. var errEdgeConnectionClosed = fmt.Errorf("connection with edge closed")
  23. type http2Connection struct {
  24. conn net.Conn
  25. server *http2.Server
  26. config *Config
  27. namedTunnel *NamedTunnelConfig
  28. connOptions *tunnelpogs.ConnectionOptions
  29. observer *Observer
  30. connIndexStr string
  31. connIndex uint8
  32. // newRPCClientFunc allows us to mock RPCs during testing
  33. newRPCClientFunc func(context.Context, io.ReadWriteCloser, *zerolog.Logger) NamedTunnelRPCClient
  34. activeRequestsWG sync.WaitGroup
  35. connectedFuse ConnectedFuse
  36. gracefulShutdownC <-chan struct{}
  37. stoppedGracefully bool
  38. controlStreamErr error // result of running control stream handler
  39. }
  40. func NewHTTP2Connection(
  41. conn net.Conn,
  42. config *Config,
  43. namedTunnelConfig *NamedTunnelConfig,
  44. connOptions *tunnelpogs.ConnectionOptions,
  45. observer *Observer,
  46. connIndex uint8,
  47. connectedFuse ConnectedFuse,
  48. gracefulShutdownC <-chan struct{},
  49. ) *http2Connection {
  50. return &http2Connection{
  51. conn: conn,
  52. server: &http2.Server{
  53. MaxConcurrentStreams: math.MaxUint32,
  54. },
  55. config: config,
  56. namedTunnel: namedTunnelConfig,
  57. connOptions: connOptions,
  58. observer: observer,
  59. connIndexStr: uint8ToString(connIndex),
  60. connIndex: connIndex,
  61. newRPCClientFunc: newRegistrationRPCClient,
  62. connectedFuse: connectedFuse,
  63. gracefulShutdownC: gracefulShutdownC,
  64. }
  65. }
  66. func (c *http2Connection) Serve(ctx context.Context) error {
  67. go func() {
  68. <-ctx.Done()
  69. c.close()
  70. }()
  71. c.server.ServeConn(c.conn, &http2.ServeConnOpts{
  72. Context: ctx,
  73. Handler: c,
  74. })
  75. switch {
  76. case c.stoppedGracefully:
  77. return nil
  78. case c.controlStreamErr != nil:
  79. return c.controlStreamErr
  80. default:
  81. c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Lost connection with the edge")
  82. return errEdgeConnectionClosed
  83. }
  84. }
  85. func (c *http2Connection) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  86. c.activeRequestsWG.Add(1)
  87. defer c.activeRequestsWG.Done()
  88. connType := determineHTTP2Type(r)
  89. respWriter, err := newHTTP2RespWriter(r, w, connType)
  90. if err != nil {
  91. c.observer.log.Error().Msg(err.Error())
  92. return
  93. }
  94. var proxyErr error
  95. switch connType {
  96. case TypeControlStream:
  97. proxyErr = c.serveControlStream(r.Context(), respWriter)
  98. c.controlStreamErr = proxyErr
  99. case TypeWebsocket:
  100. stripWebsocketUpgradeHeader(r)
  101. proxyErr = c.config.OriginProxy.Proxy(respWriter, r, TypeWebsocket)
  102. default:
  103. proxyErr = c.config.OriginProxy.Proxy(respWriter, r, connType)
  104. }
  105. if proxyErr != nil {
  106. respWriter.WriteErrorResponse()
  107. }
  108. }
  109. func (c *http2Connection) serveControlStream(ctx context.Context, respWriter *http2RespWriter) error {
  110. rpcClient := c.newRPCClientFunc(ctx, respWriter, c.observer.log)
  111. defer rpcClient.Close()
  112. if err := rpcClient.RegisterConnection(ctx, c.namedTunnel, c.connOptions, c.connIndex, c.observer); err != nil {
  113. return err
  114. }
  115. c.connectedFuse.Connected()
  116. // wait for connection termination or start of graceful shutdown
  117. select {
  118. case <-ctx.Done():
  119. break
  120. case <-c.gracefulShutdownC:
  121. c.stoppedGracefully = true
  122. }
  123. c.observer.sendUnregisteringEvent(c.connIndex)
  124. rpcClient.GracefulShutdown(ctx, c.config.GracePeriod)
  125. c.observer.log.Info().Uint8(LogFieldConnIndex, c.connIndex).Msg("Unregistered tunnel connection")
  126. return nil
  127. }
  128. func (c *http2Connection) close() {
  129. // Wait for all serve HTTP handlers to return
  130. c.activeRequestsWG.Wait()
  131. c.conn.Close()
  132. }
  133. type http2RespWriter struct {
  134. r io.Reader
  135. w http.ResponseWriter
  136. flusher http.Flusher
  137. shouldFlush bool
  138. }
  139. func newHTTP2RespWriter(r *http.Request, w http.ResponseWriter, connType Type) (*http2RespWriter, error) {
  140. flusher, isFlusher := w.(http.Flusher)
  141. if !isFlusher {
  142. respWriter := &http2RespWriter{
  143. r: r.Body,
  144. w: w,
  145. }
  146. respWriter.WriteErrorResponse()
  147. return nil, fmt.Errorf("%T doesn't implement http.Flusher", w)
  148. }
  149. return &http2RespWriter{
  150. r: r.Body,
  151. w: w,
  152. flusher: flusher,
  153. shouldFlush: connType.shouldFlush(),
  154. }, nil
  155. }
  156. func (rp *http2RespWriter) WriteRespHeaders(status int, header http.Header) error {
  157. dest := rp.w.Header()
  158. userHeaders := make(http.Header, len(header))
  159. for header, values := range header {
  160. // Since these are http2 headers, they're required to be lowercase
  161. h2name := strings.ToLower(header)
  162. for _, v := range values {
  163. if h2name == "content-length" {
  164. // This header has meaning in HTTP/2 and will be used by the edge,
  165. // so it should be sent as an HTTP/2 response header.
  166. dest.Add(h2name, v)
  167. // Since these are http2 headers, they're required to be lowercase
  168. } else if !h2mux.IsControlHeader(h2name) || h2mux.IsWebsocketClientHeader(h2name) {
  169. // User headers, on the other hand, must all be serialized so that
  170. // HTTP/2 header validation won't be applied to HTTP/1 header values
  171. userHeaders.Add(h2name, v)
  172. }
  173. }
  174. }
  175. // Perform user header serialization and set them in the single header
  176. dest.Set(canonicalResponseUserHeadersField, h2mux.SerializeHeaders(userHeaders))
  177. rp.setResponseMetaHeader(responseMetaHeaderOrigin)
  178. // HTTP2 removes support for 101 Switching Protocols https://tools.ietf.org/html/rfc7540#section-8.1.1
  179. if status == http.StatusSwitchingProtocols {
  180. status = http.StatusOK
  181. }
  182. rp.w.WriteHeader(status)
  183. if IsServerSentEvent(header) {
  184. rp.shouldFlush = true
  185. }
  186. if rp.shouldFlush {
  187. rp.flusher.Flush()
  188. }
  189. return nil
  190. }
  191. func (rp *http2RespWriter) WriteErrorResponse() {
  192. rp.setResponseMetaHeader(responseMetaHeaderCfd)
  193. rp.w.WriteHeader(http.StatusBadGateway)
  194. }
  195. func (rp *http2RespWriter) setResponseMetaHeader(value string) {
  196. rp.w.Header().Set(canonicalResponseMetaHeaderField, value)
  197. }
  198. func (rp *http2RespWriter) Read(p []byte) (n int, err error) {
  199. return rp.r.Read(p)
  200. }
  201. func (rp *http2RespWriter) Write(p []byte) (n int, err error) {
  202. defer func() {
  203. // Implementer of OriginClient should make sure it doesn't write to the connection after Proxy returns
  204. // Register a recover routine just in case.
  205. if r := recover(); r != nil {
  206. println("Recover from http2 response writer panic, error", r)
  207. }
  208. }()
  209. n, err = rp.w.Write(p)
  210. if err == nil && rp.shouldFlush {
  211. rp.flusher.Flush()
  212. }
  213. return n, err
  214. }
  215. func (rp *http2RespWriter) Close() error {
  216. return nil
  217. }
  218. func determineHTTP2Type(r *http.Request) Type {
  219. switch {
  220. case isWebsocketUpgrade(r):
  221. return TypeWebsocket
  222. case IsTCPStream(r):
  223. return TypeTCP
  224. case isControlStreamUpgrade(r):
  225. return TypeControlStream
  226. default:
  227. return TypeHTTP
  228. }
  229. }
  230. func isControlStreamUpgrade(r *http.Request) bool {
  231. return r.Header.Get(internalUpgradeHeader) == controlStreamUpgrade
  232. }
  233. func isWebsocketUpgrade(r *http.Request) bool {
  234. return r.Header.Get(internalUpgradeHeader) == websocketUpgrade
  235. }
  236. // IsTCPStream discerns if the connection request needs a tcp stream proxy.
  237. func IsTCPStream(r *http.Request) bool {
  238. return r.Header.Get(tcpStreamHeader) != ""
  239. }
  240. func stripWebsocketUpgradeHeader(r *http.Request) {
  241. r.Header.Del(internalUpgradeHeader)
  242. }