muxedstream.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. package h2mux
  2. import (
  3. "bytes"
  4. "io"
  5. "sync"
  6. )
  7. type ReadWriteLengther interface {
  8. io.ReadWriter
  9. Reset()
  10. Len() int
  11. }
  12. type ReadWriteClosedCloser interface {
  13. io.ReadWriteCloser
  14. Closed() bool
  15. }
  16. // MuxedStreamDataSignaller is a write-only *ReadyList
  17. type MuxedStreamDataSignaller interface {
  18. // Non-blocking: call this when data is ready to be sent for the given stream ID.
  19. Signal(ID uint32)
  20. }
  21. // MuxedStream is logically an HTTP/2 stream, with an additional buffer for outgoing data.
  22. type MuxedStream struct {
  23. streamID uint32
  24. // The "Receive" end of the stream
  25. readBufferLock sync.RWMutex
  26. readBuffer ReadWriteClosedCloser
  27. // This is the amount of bytes that are in our receive window
  28. // (how much data we can receive into this stream).
  29. receiveWindow uint32
  30. // current receive window size limit. Exponentially increase it when it's exhausted
  31. receiveWindowCurrentMax uint32
  32. // hard limit set in http2 spec. 2^31-1
  33. receiveWindowMax uint32
  34. // The desired size increment for receiveWindow.
  35. // If this is nonzero, a WINDOW_UPDATE frame needs to be sent.
  36. windowUpdate uint32
  37. // The headers that were most recently received.
  38. // Particularly:
  39. // * for an eyeball-initiated stream (as passed to TunnelHandler::ServeStream),
  40. // these are the request headers
  41. // * for a cloudflared-initiated stream (as created by Register/UnregisterTunnel),
  42. // these are the response headers.
  43. // They are useful in both of these contexts; hence `Headers` is public.
  44. Headers []Header
  45. // For use in the context of a cloudflared-initiated stream.
  46. responseHeadersReceived chan struct{}
  47. // The "Send" end of the stream
  48. writeLock sync.Mutex
  49. writeBuffer ReadWriteLengther
  50. // The maximum capacity that the send buffer should grow to.
  51. writeBufferMaxLen int
  52. // A channel to be notified when the send buffer is not full.
  53. writeBufferHasSpace chan struct{}
  54. // This is the amount of bytes that are in the peer's receive window
  55. // (how much data we can send from this stream).
  56. sendWindow uint32
  57. // The muxer's readyList
  58. readyList MuxedStreamDataSignaller
  59. // The headers that should be sent, and a flag so we only send them once.
  60. headersSent bool
  61. writeHeaders []Header
  62. // EOF-related fields
  63. // true if the write end of this stream has been closed
  64. writeEOF bool
  65. // true if we have sent EOF to the peer
  66. sentEOF bool
  67. // true if the peer sent us an EOF
  68. receivedEOF bool
  69. // If valid, tunnelHostname is used to identify which origin service is the intended recipient of the request
  70. tunnelHostname TunnelHostname
  71. // Compression-related fields
  72. receivedUseDict bool
  73. method string
  74. contentType string
  75. path string
  76. dictionaries h2Dictionaries
  77. }
  78. type TunnelHostname string
  79. func (th TunnelHostname) String() string {
  80. return string(th)
  81. }
  82. func (th TunnelHostname) IsSet() bool {
  83. return th != ""
  84. }
  85. func NewStream(config MuxerConfig, writeHeaders []Header, readyList MuxedStreamDataSignaller, dictionaries h2Dictionaries) *MuxedStream {
  86. return &MuxedStream{
  87. responseHeadersReceived: make(chan struct{}),
  88. readBuffer: NewSharedBuffer(),
  89. writeBuffer: &bytes.Buffer{},
  90. writeBufferMaxLen: config.StreamWriteBufferMaxLen,
  91. writeBufferHasSpace: make(chan struct{}, 1),
  92. receiveWindow: config.DefaultWindowSize,
  93. receiveWindowCurrentMax: config.DefaultWindowSize,
  94. receiveWindowMax: config.MaxWindowSize,
  95. sendWindow: config.DefaultWindowSize,
  96. readyList: readyList,
  97. writeHeaders: writeHeaders,
  98. dictionaries: dictionaries,
  99. }
  100. }
  101. func (s *MuxedStream) Read(p []byte) (n int, err error) {
  102. var readBuffer ReadWriteClosedCloser
  103. if s.dictionaries.read != nil {
  104. s.readBufferLock.RLock()
  105. readBuffer = s.readBuffer
  106. s.readBufferLock.RUnlock()
  107. } else {
  108. readBuffer = s.readBuffer
  109. }
  110. n, err = readBuffer.Read(p)
  111. s.replenishReceiveWindow(uint32(n))
  112. return
  113. }
  114. // Blocks until len(p) bytes have been written to the buffer
  115. func (s *MuxedStream) Write(p []byte) (int, error) {
  116. // If assignDictToStream returns success, then it will have acquired the
  117. // writeLock. Otherwise we must acquire it ourselves.
  118. ok := assignDictToStream(s, p)
  119. if !ok {
  120. s.writeLock.Lock()
  121. }
  122. defer s.writeLock.Unlock()
  123. if s.writeEOF {
  124. return 0, io.EOF
  125. }
  126. // pre-allocate some space in the write buffer if possible
  127. if buffer, ok := s.writeBuffer.(*bytes.Buffer); ok {
  128. if buffer.Cap() == 0 {
  129. buffer.Grow(writeBufferInitialSize)
  130. }
  131. }
  132. totalWritten := 0
  133. for totalWritten < len(p) {
  134. // If the buffer is full, block till there is more room.
  135. // Use a loop to recheck the buffer size after the lock is reacquired.
  136. for s.writeBufferMaxLen <= s.writeBuffer.Len() {
  137. s.awaitWriteBufferHasSpace()
  138. if s.writeEOF {
  139. return totalWritten, io.EOF
  140. }
  141. }
  142. amountToWrite := len(p) - totalWritten
  143. spaceAvailable := s.writeBufferMaxLen - s.writeBuffer.Len()
  144. if spaceAvailable < amountToWrite {
  145. amountToWrite = spaceAvailable
  146. }
  147. amountWritten, err := s.writeBuffer.Write(p[totalWritten : totalWritten+amountToWrite])
  148. totalWritten += amountWritten
  149. if err != nil {
  150. return totalWritten, err
  151. }
  152. s.writeNotify()
  153. }
  154. return totalWritten, nil
  155. }
  156. func (s *MuxedStream) Close() error {
  157. // TUN-115: Close the write buffer before the read buffer.
  158. // In the case of shutdown, read will not get new data, but the write buffer can still receive
  159. // new data. Closing read before write allows application to race between a failed read and a
  160. // successful write, even though this close should appear to be atomic.
  161. // This can't happen the other way because reads may succeed after a failed write; if we read
  162. // past EOF the application will block until we close the buffer.
  163. err := s.CloseWrite()
  164. if err != nil {
  165. if s.CloseRead() == nil {
  166. // don't bother the caller with errors if at least one close succeeded
  167. return nil
  168. }
  169. return err
  170. }
  171. return s.CloseRead()
  172. }
  173. func (s *MuxedStream) CloseRead() error {
  174. return s.readBuffer.Close()
  175. }
  176. func (s *MuxedStream) CloseWrite() error {
  177. s.writeLock.Lock()
  178. defer s.writeLock.Unlock()
  179. if s.writeEOF {
  180. return io.EOF
  181. }
  182. s.writeEOF = true
  183. if c, ok := s.writeBuffer.(io.Closer); ok {
  184. c.Close()
  185. }
  186. // Allow MuxedStream::Write() to terminate its loop with err=io.EOF, if needed
  187. s.notifyWriteBufferHasSpace()
  188. // We need to send something over the wire, even if it's an END_STREAM with no data
  189. s.writeNotify()
  190. return nil
  191. }
  192. func (s *MuxedStream) WriteClosed() bool {
  193. s.writeLock.Lock()
  194. defer s.writeLock.Unlock()
  195. return s.writeEOF
  196. }
  197. func (s *MuxedStream) WriteHeaders(headers []Header) error {
  198. s.writeLock.Lock()
  199. defer s.writeLock.Unlock()
  200. if s.writeHeaders != nil {
  201. return ErrStreamHeadersSent
  202. }
  203. if s.dictionaries.write != nil {
  204. dictWriter := s.dictionaries.write.getDictWriter(s, headers)
  205. if dictWriter != nil {
  206. s.writeBuffer = dictWriter
  207. }
  208. }
  209. s.writeHeaders = headers
  210. s.headersSent = false
  211. s.writeNotify()
  212. return nil
  213. }
  214. // IsRPCStream returns if the stream is used to transport RPC.
  215. func (s *MuxedStream) IsRPCStream() bool {
  216. rpcHeaders := RPCHeaders()
  217. if len(s.Headers) != len(rpcHeaders) {
  218. return false
  219. }
  220. // The headers order matters, so RPC stream should be opened with OpenRPCStream method and let MuxWriter serializes the headers.
  221. for i, rpcHeader := range rpcHeaders {
  222. if s.Headers[i] != rpcHeader {
  223. return false
  224. }
  225. }
  226. return true
  227. }
  228. func (s *MuxedStream) TunnelHostname() TunnelHostname {
  229. return s.tunnelHostname
  230. }
  231. // Block until a value is sent on writeBufferHasSpace.
  232. // Must be called while holding writeLock
  233. func (s *MuxedStream) awaitWriteBufferHasSpace() {
  234. s.writeLock.Unlock()
  235. <-s.writeBufferHasSpace
  236. s.writeLock.Lock()
  237. }
  238. // Send a value on writeBufferHasSpace without blocking.
  239. // Must be called while holding writeLock
  240. func (s *MuxedStream) notifyWriteBufferHasSpace() {
  241. select {
  242. case s.writeBufferHasSpace <- struct{}{}:
  243. default:
  244. }
  245. }
  246. func (s *MuxedStream) getReceiveWindow() uint32 {
  247. s.writeLock.Lock()
  248. defer s.writeLock.Unlock()
  249. return s.receiveWindow
  250. }
  251. func (s *MuxedStream) getSendWindow() uint32 {
  252. s.writeLock.Lock()
  253. defer s.writeLock.Unlock()
  254. return s.sendWindow
  255. }
  256. // writeNotify must happen while holding writeLock.
  257. func (s *MuxedStream) writeNotify() {
  258. s.readyList.Signal(s.streamID)
  259. }
  260. // Call by muxreader when it gets a WindowUpdateFrame. This is an update of the peer's
  261. // receive window (how much data we can send).
  262. func (s *MuxedStream) replenishSendWindow(bytes uint32) {
  263. s.writeLock.Lock()
  264. defer s.writeLock.Unlock()
  265. s.sendWindow += bytes
  266. s.writeNotify()
  267. }
  268. // Call by muxreader when it receives a data frame
  269. func (s *MuxedStream) consumeReceiveWindow(bytes uint32) bool {
  270. s.writeLock.Lock()
  271. defer s.writeLock.Unlock()
  272. // received data size is greater than receive window/buffer
  273. if s.receiveWindow < bytes {
  274. return false
  275. }
  276. s.receiveWindow -= bytes
  277. if s.receiveWindow < s.receiveWindowCurrentMax/2 && s.receiveWindowCurrentMax < s.receiveWindowMax {
  278. // exhausting client send window (how much data client can send)
  279. // and there is room to grow the receive window
  280. newMax := s.receiveWindowCurrentMax << 1
  281. if newMax > s.receiveWindowMax {
  282. newMax = s.receiveWindowMax
  283. }
  284. s.windowUpdate += newMax - s.receiveWindowCurrentMax
  285. s.receiveWindowCurrentMax = newMax
  286. // notify MuxWriter to write WINDOW_UPDATE frame
  287. s.writeNotify()
  288. }
  289. return true
  290. }
  291. // Arranges for the MuxWriter to send a WINDOW_UPDATE
  292. // Called by MuxedStream::Read when data has left the read buffer.
  293. func (s *MuxedStream) replenishReceiveWindow(bytes uint32) {
  294. s.writeLock.Lock()
  295. defer s.writeLock.Unlock()
  296. s.windowUpdate += bytes
  297. s.writeNotify()
  298. }
  299. // receiveEOF should be called when the peer indicates no more data will be sent.
  300. // Returns true if the socket is now closed (i.e. the write side is already closed).
  301. func (s *MuxedStream) receiveEOF() (closed bool) {
  302. s.writeLock.Lock()
  303. defer s.writeLock.Unlock()
  304. s.receivedEOF = true
  305. s.CloseRead()
  306. return s.writeEOF && s.writeBuffer.Len() == 0
  307. }
  308. func (s *MuxedStream) gotReceiveEOF() bool {
  309. s.writeLock.Lock()
  310. defer s.writeLock.Unlock()
  311. return s.receivedEOF
  312. }
  313. // MuxedStreamReader implements io.ReadCloser for the read end of the stream.
  314. // This is useful for passing to functions that close the object after it is done reading,
  315. // but you still want to be able to write data afterwards (e.g. http.Client).
  316. type MuxedStreamReader struct {
  317. *MuxedStream
  318. }
  319. func (s MuxedStreamReader) Read(p []byte) (n int, err error) {
  320. return s.MuxedStream.Read(p)
  321. }
  322. func (s MuxedStreamReader) Close() error {
  323. return s.MuxedStream.CloseRead()
  324. }
  325. // streamChunk represents a chunk of data to be written.
  326. type streamChunk struct {
  327. streamID uint32
  328. // true if a HEADERS frame should be sent
  329. sendHeaders bool
  330. headers []Header
  331. // nonzero if a WINDOW_UPDATE frame should be sent;
  332. // in that case, it is the increment value to use
  333. windowUpdate uint32
  334. // true if data frames should be sent
  335. sendData bool
  336. eof bool
  337. buffer []byte
  338. offset int
  339. }
  340. // getChunk atomically extracts a chunk of data to be written by MuxWriter.
  341. // The data returned will not exceed the send window for this stream.
  342. func (s *MuxedStream) getChunk() *streamChunk {
  343. s.writeLock.Lock()
  344. defer s.writeLock.Unlock()
  345. chunk := &streamChunk{
  346. streamID: s.streamID,
  347. sendHeaders: !s.headersSent,
  348. headers: s.writeHeaders,
  349. windowUpdate: s.windowUpdate,
  350. sendData: !s.sentEOF,
  351. eof: s.writeEOF && uint32(s.writeBuffer.Len()) <= s.sendWindow,
  352. }
  353. // Copy at most s.sendWindow bytes, adjust the sendWindow accordingly
  354. toCopy := int(s.sendWindow)
  355. if toCopy > s.writeBuffer.Len() {
  356. toCopy = s.writeBuffer.Len()
  357. }
  358. if toCopy > 0 {
  359. buf := make([]byte, toCopy)
  360. writeLen, _ := s.writeBuffer.Read(buf)
  361. chunk.buffer = buf[:writeLen]
  362. s.sendWindow -= uint32(writeLen)
  363. }
  364. // Allow MuxedStream::Write() to continue, if needed
  365. if s.writeBuffer.Len() < s.writeBufferMaxLen {
  366. s.notifyWriteBufferHasSpace()
  367. }
  368. // When we write the chunk, we'll write the WINDOW_UPDATE frame if needed
  369. s.receiveWindow += s.windowUpdate
  370. s.windowUpdate = 0
  371. // When we write the chunk, we'll write the headers if needed
  372. s.headersSent = true
  373. // if this chunk contains the end of the stream, close the stream now
  374. if chunk.sendData && chunk.eof {
  375. s.sentEOF = true
  376. }
  377. return chunk
  378. }
  379. func (c *streamChunk) sendHeadersFrame() bool {
  380. return c.sendHeaders
  381. }
  382. func (c *streamChunk) sendWindowUpdateFrame() bool {
  383. return c.windowUpdate > 0
  384. }
  385. func (c *streamChunk) sendDataFrame() bool {
  386. return c.sendData
  387. }
  388. func (c *streamChunk) nextDataFrame(frameSize int) (payload []byte, endStream bool) {
  389. bytesLeft := len(c.buffer) - c.offset
  390. if frameSize > bytesLeft {
  391. frameSize = bytesLeft
  392. }
  393. nextOffset := c.offset + frameSize
  394. payload = c.buffer[c.offset:nextOffset]
  395. c.offset = nextOffset
  396. if c.offset == len(c.buffer) {
  397. // this is the last data frame in this chunk
  398. c.sendData = false
  399. if c.eof {
  400. endStream = true
  401. }
  402. }
  403. return
  404. }