readylist.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package h2mux
  2. import "sync"
  3. // ReadyList multiplexes several event signals onto a single channel.
  4. type ReadyList struct {
  5. // signalC is used to signal that a stream can be enqueued
  6. signalC chan uint32
  7. // waitC is used to signal the ID of the first ready descriptor
  8. waitC chan uint32
  9. // doneC is used to signal that run should terminate
  10. doneC chan struct{}
  11. closeOnce sync.Once
  12. }
  13. func NewReadyList() *ReadyList {
  14. rl := &ReadyList{
  15. signalC: make(chan uint32),
  16. waitC: make(chan uint32),
  17. doneC: make(chan struct{}),
  18. }
  19. go rl.run()
  20. return rl
  21. }
  22. // ID is the stream ID
  23. func (r *ReadyList) Signal(ID uint32) {
  24. select {
  25. case r.signalC <- ID:
  26. // ReadyList already closed
  27. case <-r.doneC:
  28. }
  29. }
  30. func (r *ReadyList) ReadyChannel() <-chan uint32 {
  31. return r.waitC
  32. }
  33. func (r *ReadyList) Close() {
  34. r.closeOnce.Do(func() {
  35. close(r.doneC)
  36. })
  37. }
  38. func (r *ReadyList) run() {
  39. defer close(r.waitC)
  40. var queue readyDescriptorQueue
  41. var firstReady *readyDescriptor
  42. activeDescriptors := newReadyDescriptorMap()
  43. for {
  44. if firstReady == nil {
  45. select {
  46. case i := <-r.signalC:
  47. firstReady = activeDescriptors.SetIfMissing(i)
  48. case <-r.doneC:
  49. return
  50. }
  51. }
  52. select {
  53. case r.waitC <- firstReady.ID:
  54. activeDescriptors.Delete(firstReady.ID)
  55. firstReady = queue.Dequeue()
  56. case i := <-r.signalC:
  57. newReady := activeDescriptors.SetIfMissing(i)
  58. if newReady != nil {
  59. // key doesn't exist
  60. queue.Enqueue(newReady)
  61. }
  62. case <-r.doneC:
  63. return
  64. }
  65. }
  66. }
  67. type readyDescriptor struct {
  68. ID uint32
  69. Next *readyDescriptor
  70. }
  71. // readyDescriptorQueue is a queue of readyDescriptors in the form of a singly-linked list.
  72. // The nil readyDescriptorQueue is an empty queue ready for use.
  73. type readyDescriptorQueue struct {
  74. Head *readyDescriptor
  75. Tail *readyDescriptor
  76. }
  77. func (q *readyDescriptorQueue) Empty() bool {
  78. return q.Head == nil
  79. }
  80. func (q *readyDescriptorQueue) Enqueue(x *readyDescriptor) {
  81. if x.Next != nil {
  82. panic("enqueued already queued item")
  83. }
  84. if q.Empty() {
  85. q.Head = x
  86. q.Tail = x
  87. } else {
  88. q.Tail.Next = x
  89. q.Tail = x
  90. }
  91. }
  92. // Dequeue returns the first readyDescriptor in the queue, or nil if empty.
  93. func (q *readyDescriptorQueue) Dequeue() *readyDescriptor {
  94. if q.Empty() {
  95. return nil
  96. }
  97. x := q.Head
  98. q.Head = x.Next
  99. x.Next = nil
  100. return x
  101. }
  102. // readyDescriptorQueue is a map of readyDescriptors keyed by ID.
  103. // It maintains a free list of deleted ready descriptors.
  104. type readyDescriptorMap struct {
  105. descriptors map[uint32]*readyDescriptor
  106. free []*readyDescriptor
  107. }
  108. func newReadyDescriptorMap() *readyDescriptorMap {
  109. return &readyDescriptorMap{descriptors: make(map[uint32]*readyDescriptor)}
  110. }
  111. // create or reuse a readyDescriptor if the stream is not in the queue.
  112. // This avoid stream starvation caused by a single high-bandwidth stream monopolising the writer goroutine
  113. func (m *readyDescriptorMap) SetIfMissing(key uint32) *readyDescriptor {
  114. if _, ok := m.descriptors[key]; ok {
  115. return nil
  116. }
  117. var newDescriptor *readyDescriptor
  118. if len(m.free) > 0 {
  119. // reuse deleted ready descriptors
  120. newDescriptor = m.free[len(m.free)-1]
  121. m.free = m.free[:len(m.free)-1]
  122. } else {
  123. newDescriptor = &readyDescriptor{}
  124. }
  125. newDescriptor.ID = key
  126. m.descriptors[key] = newDescriptor
  127. return newDescriptor
  128. }
  129. func (m *readyDescriptorMap) Delete(key uint32) {
  130. if descriptor, ok := m.descriptors[key]; ok {
  131. m.free = append(m.free, descriptor)
  132. delete(m.descriptors, key)
  133. }
  134. }