muxreader.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package h2mux
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "net/url"
  8. "time"
  9. "github.com/rs/zerolog"
  10. "golang.org/x/net/http2"
  11. )
  12. const (
  13. CloudflaredProxyTunnelHostnameHeader = "cf-cloudflared-proxy-tunnel-hostname"
  14. )
  15. type MuxReader struct {
  16. // f is used to read HTTP2 frames.
  17. f *http2.Framer
  18. // handler provides a callback to receive new streams. if nil, new streams cannot be accepted.
  19. handler MuxedStreamHandler
  20. // streams tracks currently-open streams.
  21. streams *activeStreamMap
  22. // readyList is used to signal writable streams.
  23. readyList *ReadyList
  24. // streamErrors lets us report stream errors to the MuxWriter.
  25. streamErrors *StreamErrorMap
  26. // goAwayChan is used to tell the writer to send a GOAWAY message.
  27. goAwayChan chan<- http2.ErrCode
  28. // abortChan is used when shutting down ungracefully. When this becomes readable, all activity should stop.
  29. abortChan <-chan struct{}
  30. // pingTimestamp is an atomic value containing the latest received ping timestamp.
  31. pingTimestamp *PingTimestamp
  32. // connActive is used to signal to the writer that something happened on the connection.
  33. // This is used to clear idle timeout disconnection deadlines.
  34. connActive Signal
  35. // The initial value for the send and receive window of a new stream.
  36. initialStreamWindow uint32
  37. // The max value for the send window of a stream.
  38. streamWindowMax uint32
  39. // The max size for the write buffer of a stream
  40. streamWriteBufferMaxLen int
  41. // r is a reference to the underlying connection used when shutting down.
  42. r io.Closer
  43. // metricsUpdater is used to report metrics
  44. metricsUpdater muxMetricsUpdater
  45. // bytesRead is the amount of bytes read from data frames since the last time we called metricsUpdater.updateInBoundBytes()
  46. bytesRead *AtomicCounter
  47. // dictionaries holds the h2 cross-stream compression dictionaries
  48. dictionaries h2Dictionaries
  49. }
  50. // Shutdown blocks new streams from being created.
  51. // It returns a channel that is closed once the last stream has closed.
  52. func (r *MuxReader) Shutdown() <-chan struct{} {
  53. done, alreadyInProgress := r.streams.Shutdown()
  54. if alreadyInProgress {
  55. return done
  56. }
  57. r.sendGoAway(http2.ErrCodeNo)
  58. go func() {
  59. // close reader side when last stream ends; this will cause the writer to abort
  60. <-done
  61. r.r.Close()
  62. }()
  63. return done
  64. }
  65. func (r *MuxReader) run(log *zerolog.Logger) error {
  66. defer log.Debug().Msg("mux - read: event loop finished")
  67. // routine to periodically update bytesRead
  68. go func() {
  69. tickC := time.Tick(updateFreq)
  70. for {
  71. select {
  72. case <-r.abortChan:
  73. return
  74. case <-tickC:
  75. r.metricsUpdater.updateInBoundBytes(r.bytesRead.Count())
  76. }
  77. }
  78. }()
  79. for {
  80. frame, err := r.f.ReadFrame()
  81. if err != nil {
  82. errorString := fmt.Sprintf("mux - read: %s", err)
  83. if errorDetail := r.f.ErrorDetail(); errorDetail != nil {
  84. errorString = fmt.Sprintf("%s: errorDetail: %s", errorString, errorDetail)
  85. }
  86. switch e := err.(type) {
  87. case http2.StreamError:
  88. log.Info().Msgf("%s: stream error", errorString)
  89. // Ideally we wouldn't return here, since that aborts the muxer.
  90. // We should communicate the error to the relevant MuxedStream
  91. // data structure, so that callers of MuxedStream.Read() and
  92. // MuxedStream.Write() would see it. Then we could `continue`
  93. // and keep the muxer going.
  94. return r.streamError(e.StreamID, e.Code)
  95. case http2.ConnectionError:
  96. log.Info().Msgf("%s: stream error", errorString)
  97. return r.connectionError(err)
  98. default:
  99. if isConnectionClosedError(err) {
  100. if r.streams.Len() == 0 {
  101. // don't log the error here -- that would just be extra noise
  102. log.Debug().Msg("mux - read: shutting down")
  103. return nil
  104. }
  105. log.Info().Msgf("%s: connection closed unexpectedly", errorString)
  106. return err
  107. } else {
  108. log.Info().Msgf("%s: frame read error", errorString)
  109. return r.connectionError(err)
  110. }
  111. }
  112. }
  113. r.connActive.Signal()
  114. log.Debug().Msgf("mux - read: read frame: data %v", frame)
  115. switch f := frame.(type) {
  116. case *http2.DataFrame:
  117. err = r.receiveFrameData(f, log)
  118. case *http2.MetaHeadersFrame:
  119. err = r.receiveHeaderData(f)
  120. case *http2.RSTStreamFrame:
  121. streamID := f.Header().StreamID
  122. if streamID == 0 {
  123. return ErrInvalidStream
  124. }
  125. if stream, ok := r.streams.Get(streamID); ok {
  126. stream.Close()
  127. }
  128. r.streams.Delete(streamID)
  129. case *http2.PingFrame:
  130. r.receivePingData(f)
  131. case *http2.GoAwayFrame:
  132. err = r.receiveGoAway(f)
  133. // The receiver of a flow-controlled frame sends a WINDOW_UPDATE frame as it
  134. // consumes data and frees up space in flow-control windows
  135. case *http2.WindowUpdateFrame:
  136. err = r.updateStreamWindow(f)
  137. case *http2.UnknownFrame:
  138. switch f.Header().Type {
  139. case FrameUseDictionary:
  140. err = r.receiveUseDictionary(f)
  141. case FrameSetDictionary:
  142. err = r.receiveSetDictionary(f)
  143. default:
  144. err = ErrUnexpectedFrameType
  145. }
  146. default:
  147. err = ErrUnexpectedFrameType
  148. }
  149. if err != nil {
  150. log.Debug().Msgf("mux - read: read error: data %v", frame)
  151. return r.connectionError(err)
  152. }
  153. }
  154. }
  155. func (r *MuxReader) newMuxedStream(streamID uint32) *MuxedStream {
  156. return &MuxedStream{
  157. streamID: streamID,
  158. readBuffer: NewSharedBuffer(),
  159. writeBuffer: &bytes.Buffer{},
  160. writeBufferMaxLen: r.streamWriteBufferMaxLen,
  161. writeBufferHasSpace: make(chan struct{}, 1),
  162. receiveWindow: r.initialStreamWindow,
  163. receiveWindowCurrentMax: r.initialStreamWindow,
  164. receiveWindowMax: r.streamWindowMax,
  165. sendWindow: r.initialStreamWindow,
  166. readyList: r.readyList,
  167. dictionaries: r.dictionaries,
  168. }
  169. }
  170. // getStreamForFrame returns a stream if valid, or an error describing why the stream could not be returned.
  171. func (r *MuxReader) getStreamForFrame(frame http2.Frame) (*MuxedStream, error) {
  172. sid := frame.Header().StreamID
  173. if sid == 0 {
  174. return nil, ErrUnexpectedFrameType
  175. }
  176. if stream, ok := r.streams.Get(sid); ok {
  177. return stream, nil
  178. }
  179. if r.streams.IsLocalStreamID(sid) {
  180. // no stream available, but no error
  181. return nil, ErrClosedStream
  182. }
  183. if sid < r.streams.LastPeerStreamID() {
  184. // no stream available, stream closed error
  185. return nil, ErrClosedStream
  186. }
  187. return nil, ErrUnknownStream
  188. }
  189. func (r *MuxReader) defaultStreamErrorHandler(err error, header http2.FrameHeader) error {
  190. if header.Flags.Has(http2.FlagHeadersEndStream) {
  191. return nil
  192. } else if err == ErrUnknownStream || err == ErrClosedStream {
  193. return r.streamError(header.StreamID, http2.ErrCodeStreamClosed)
  194. } else {
  195. return err
  196. }
  197. }
  198. // Receives header frames from a stream. A non-nil error is a connection error.
  199. func (r *MuxReader) receiveHeaderData(frame *http2.MetaHeadersFrame) error {
  200. var stream *MuxedStream
  201. sid := frame.Header().StreamID
  202. if sid == 0 {
  203. return ErrUnexpectedFrameType
  204. }
  205. newStream := r.streams.IsPeerStreamID(sid)
  206. if newStream {
  207. // header request
  208. // TODO support trailers (if stream exists)
  209. ok, err := r.streams.AcquirePeerID(sid)
  210. if !ok {
  211. // ignore new streams while shutting down
  212. return r.streamError(sid, err)
  213. }
  214. stream = r.newMuxedStream(sid)
  215. // Set stream. Returns false if a stream already existed with that ID or we are shutting down, return false.
  216. if !r.streams.Set(stream) {
  217. // got HEADERS frame for an existing stream
  218. // TODO support trailers
  219. return r.streamError(sid, http2.ErrCodeInternal)
  220. }
  221. } else {
  222. // header response
  223. var err error
  224. if stream, err = r.getStreamForFrame(frame); err != nil {
  225. return r.defaultStreamErrorHandler(err, frame.Header())
  226. }
  227. }
  228. headers := make([]Header, 0, len(frame.Fields))
  229. for _, header := range frame.Fields {
  230. switch header.Name {
  231. case ":method":
  232. stream.method = header.Value
  233. case ":path":
  234. u, err := url.Parse(header.Value)
  235. if err == nil {
  236. stream.path = u.Path
  237. }
  238. case "accept-encoding":
  239. // remove accept-encoding if dictionaries are enabled
  240. if r.dictionaries.write != nil {
  241. continue
  242. }
  243. case CloudflaredProxyTunnelHostnameHeader:
  244. stream.tunnelHostname = TunnelHostname(header.Value)
  245. }
  246. headers = append(headers, Header{Name: header.Name, Value: header.Value})
  247. }
  248. stream.Headers = headers
  249. if frame.Header().Flags.Has(http2.FlagHeadersEndStream) {
  250. stream.receiveEOF()
  251. return nil
  252. }
  253. if newStream {
  254. go r.handleStream(stream)
  255. } else {
  256. close(stream.responseHeadersReceived)
  257. }
  258. return nil
  259. }
  260. func (r *MuxReader) handleStream(stream *MuxedStream) {
  261. defer stream.Close()
  262. r.handler.ServeStream(stream)
  263. }
  264. // Receives a data frame from a stream. A non-nil error is a connection error.
  265. func (r *MuxReader) receiveFrameData(frame *http2.DataFrame, log *zerolog.Logger) error {
  266. stream, err := r.getStreamForFrame(frame)
  267. if err != nil {
  268. return r.defaultStreamErrorHandler(err, frame.Header())
  269. }
  270. data := frame.Data()
  271. if len(data) > 0 {
  272. n, err := stream.readBuffer.Write(data)
  273. if err != nil {
  274. return r.streamError(stream.streamID, http2.ErrCodeInternal)
  275. }
  276. r.bytesRead.IncrementBy(uint64(n))
  277. }
  278. if frame.Header().Flags.Has(http2.FlagDataEndStream) {
  279. if stream.receiveEOF() {
  280. r.streams.Delete(stream.streamID)
  281. log.Debug().Msgf("mux - read: stream closed: streamID: %d", frame.Header().StreamID)
  282. } else {
  283. log.Debug().Msgf("mux - read: shutdown receive side: streamID: %d", frame.Header().StreamID)
  284. }
  285. return nil
  286. }
  287. if !stream.consumeReceiveWindow(uint32(len(data))) {
  288. return r.streamError(stream.streamID, http2.ErrCodeFlowControl)
  289. }
  290. r.metricsUpdater.updateReceiveWindow(stream.getReceiveWindow())
  291. return nil
  292. }
  293. // Receive a PING from the peer. Update RTT and send/receive window metrics if it's an ACK.
  294. func (r *MuxReader) receivePingData(frame *http2.PingFrame) {
  295. ts := int64(binary.LittleEndian.Uint64(frame.Data[:]))
  296. if !frame.IsAck() {
  297. r.pingTimestamp.Set(ts)
  298. return
  299. }
  300. // Update the computed RTT aggregations with a new measurement.
  301. // `ts` is the time that the probe was sent.
  302. // We assume that `time.Now()` is the time we received that probe.
  303. r.metricsUpdater.updateRTT(&roundTripMeasurement{
  304. receiveTime: time.Now(),
  305. sendTime: time.Unix(0, ts),
  306. })
  307. }
  308. // Receive a GOAWAY from the peer. Gracefully shut down our connection.
  309. func (r *MuxReader) receiveGoAway(frame *http2.GoAwayFrame) error {
  310. r.Shutdown()
  311. // Close all streams above the last processed stream
  312. lastStream := r.streams.LastLocalStreamID()
  313. for i := frame.LastStreamID + 2; i <= lastStream; i++ {
  314. if stream, ok := r.streams.Get(i); ok {
  315. stream.Close()
  316. }
  317. }
  318. return nil
  319. }
  320. // Receive a USE_DICTIONARY from the peer. Setup dictionary for stream.
  321. func (r *MuxReader) receiveUseDictionary(frame *http2.UnknownFrame) error {
  322. payload := frame.Payload()
  323. streamID := frame.StreamID
  324. // Check frame is formatted properly
  325. if len(payload) != 1 {
  326. return r.streamError(streamID, http2.ErrCodeProtocol)
  327. }
  328. stream, err := r.getStreamForFrame(frame)
  329. if err != nil {
  330. return err
  331. }
  332. if stream.receivedUseDict == true || stream.dictionaries.read == nil {
  333. return r.streamError(streamID, http2.ErrCodeInternal)
  334. }
  335. stream.receivedUseDict = true
  336. dictID := payload[0]
  337. dictReader := stream.dictionaries.read.newReader(stream.readBuffer.(*SharedBuffer), dictID)
  338. if dictReader == nil {
  339. return r.streamError(streamID, http2.ErrCodeInternal)
  340. }
  341. stream.readBufferLock.Lock()
  342. stream.readBuffer = dictReader
  343. stream.readBufferLock.Unlock()
  344. return nil
  345. }
  346. // Receive a SET_DICTIONARY from the peer. Update dictionaries accordingly.
  347. func (r *MuxReader) receiveSetDictionary(frame *http2.UnknownFrame) (err error) {
  348. payload := frame.Payload()
  349. flags := frame.Flags
  350. stream, err := r.getStreamForFrame(frame)
  351. if err != nil && err != ErrClosedStream {
  352. return err
  353. }
  354. reader, ok := stream.readBuffer.(*h2DictionaryReader)
  355. if !ok {
  356. return r.streamError(frame.StreamID, http2.ErrCodeProtocol)
  357. }
  358. // A SetDictionary frame consists of several
  359. // Dictionary-Entries that specify how existing dictionaries
  360. // are to be updated using the current stream data
  361. // +---------------+---------------+
  362. // | Dictionary-Entry (+) ...
  363. // +---------------+---------------+
  364. for {
  365. // Each Dictionary-Entry is formatted as follows:
  366. // +-------------------------------+
  367. // | Dictionary-ID (8) |
  368. // +---+---------------------------+
  369. // | P | Size (7+) |
  370. // +---+---------------------------+
  371. // | E?| D?| Truncate? (6+) |
  372. // +---+---------------------------+
  373. // | Offset? (8+) |
  374. // +-------------------------------+
  375. var size, truncate, offset uint64
  376. var p, e, d bool
  377. // Parse a single Dictionary-Entry
  378. if len(payload) < 2 { // Must have at least id and size
  379. return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
  380. }
  381. dictID := uint8(payload[0])
  382. p = (uint8(payload[1]) >> 7) == 1
  383. payload, size, err = http2ReadVarInt(7, payload[1:])
  384. if err != nil {
  385. return
  386. }
  387. if flags.Has(FlagSetDictionaryAppend) {
  388. // Presence of FlagSetDictionaryAppend means we expect e, d and truncate
  389. if len(payload) < 1 {
  390. return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
  391. }
  392. e = (uint8(payload[0]) >> 7) == 1
  393. d = (uint8((payload[0])>>6) & 1) == 1
  394. payload, truncate, err = http2ReadVarInt(6, payload)
  395. if err != nil {
  396. return
  397. }
  398. }
  399. if flags.Has(FlagSetDictionaryOffset) {
  400. // Presence of FlagSetDictionaryOffset means we expect offset
  401. if len(payload) < 1 {
  402. return MuxerStreamError{"unexpected EOF", http2.ErrCodeProtocol}
  403. }
  404. payload, offset, err = http2ReadVarInt(8, payload)
  405. if err != nil {
  406. return
  407. }
  408. }
  409. setdict := setDictRequest{streamID: stream.streamID,
  410. dictID: dictID,
  411. dictSZ: size,
  412. truncate: truncate,
  413. offset: offset,
  414. P: p,
  415. E: e,
  416. D: d}
  417. // Find the right dictionary
  418. dict, err := r.dictionaries.read.getDictByID(dictID)
  419. if err != nil {
  420. return err
  421. }
  422. // Register a dictionary update order for the dictionary and reader
  423. updateEntry := &dictUpdate{reader: reader, dictionary: dict, s: setdict}
  424. dict.queue = append(dict.queue, updateEntry)
  425. reader.queue = append(reader.queue, updateEntry)
  426. // End of frame
  427. if len(payload) == 0 {
  428. break
  429. }
  430. }
  431. return nil
  432. }
  433. // Receives header frames from a stream. A non-nil error is a connection error.
  434. func (r *MuxReader) updateStreamWindow(frame *http2.WindowUpdateFrame) error {
  435. stream, err := r.getStreamForFrame(frame)
  436. if err != nil && err != ErrUnknownStream && err != ErrClosedStream {
  437. return err
  438. }
  439. if stream == nil {
  440. // ignore window updates on closed streams
  441. return nil
  442. }
  443. stream.replenishSendWindow(frame.Increment)
  444. r.metricsUpdater.updateSendWindow(stream.getSendWindow())
  445. return nil
  446. }
  447. // Raise a stream processing error, closing the stream. Runs on the write thread.
  448. func (r *MuxReader) streamError(streamID uint32, e http2.ErrCode) error {
  449. r.streamErrors.RaiseError(streamID, e)
  450. return nil
  451. }
  452. func (r *MuxReader) connectionError(err error) error {
  453. http2Code := http2.ErrCodeInternal
  454. switch e := err.(type) {
  455. case http2.ConnectionError:
  456. http2Code = http2.ErrCode(e)
  457. case MuxerProtocolError:
  458. http2Code = e.h2code
  459. }
  460. r.sendGoAway(http2Code)
  461. return err
  462. }
  463. // Instruct the writer to send a GOAWAY message if possible. This may fail in
  464. // the case where an existing GOAWAY message is in flight or the writer event
  465. // loop already ended.
  466. func (r *MuxReader) sendGoAway(errCode http2.ErrCode) {
  467. select {
  468. case r.goAwayChan <- errCode:
  469. default:
  470. }
  471. }