123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package h2mux
- import "sync"
- // ReadyList multiplexes several event signals onto a single channel.
- type ReadyList struct {
- // signalC is used to signal that a stream can be enqueued
- signalC chan uint32
- // waitC is used to signal the ID of the first ready descriptor
- waitC chan uint32
- // doneC is used to signal that run should terminate
- doneC chan struct{}
- closeOnce sync.Once
- }
- func NewReadyList() *ReadyList {
- rl := &ReadyList{
- signalC: make(chan uint32),
- waitC: make(chan uint32),
- doneC: make(chan struct{}),
- }
- go rl.run()
- return rl
- }
- // ID is the stream ID
- func (r *ReadyList) Signal(ID uint32) {
- select {
- case r.signalC <- ID:
- // ReadyList already closed
- case <-r.doneC:
- }
- }
- func (r *ReadyList) ReadyChannel() <-chan uint32 {
- return r.waitC
- }
- func (r *ReadyList) Close() {
- r.closeOnce.Do(func() {
- close(r.doneC)
- })
- }
- func (r *ReadyList) run() {
- defer close(r.waitC)
- var queue readyDescriptorQueue
- var firstReady *readyDescriptor
- activeDescriptors := newReadyDescriptorMap()
- for {
- if firstReady == nil {
- select {
- case i := <-r.signalC:
- firstReady = activeDescriptors.SetIfMissing(i)
- case <-r.doneC:
- return
- }
- }
- select {
- case r.waitC <- firstReady.ID:
- activeDescriptors.Delete(firstReady.ID)
- firstReady = queue.Dequeue()
- case i := <-r.signalC:
- newReady := activeDescriptors.SetIfMissing(i)
- if newReady != nil {
- // key doesn't exist
- queue.Enqueue(newReady)
- }
- case <-r.doneC:
- return
- }
- }
- }
- type readyDescriptor struct {
- ID uint32
- Next *readyDescriptor
- }
- // readyDescriptorQueue is a queue of readyDescriptors in the form of a singly-linked list.
- // The nil readyDescriptorQueue is an empty queue ready for use.
- type readyDescriptorQueue struct {
- Head *readyDescriptor
- Tail *readyDescriptor
- }
- func (q *readyDescriptorQueue) Empty() bool {
- return q.Head == nil
- }
- func (q *readyDescriptorQueue) Enqueue(x *readyDescriptor) {
- if x.Next != nil {
- panic("enqueued already queued item")
- }
- if q.Empty() {
- q.Head = x
- q.Tail = x
- } else {
- q.Tail.Next = x
- q.Tail = x
- }
- }
- // Dequeue returns the first readyDescriptor in the queue, or nil if empty.
- func (q *readyDescriptorQueue) Dequeue() *readyDescriptor {
- if q.Empty() {
- return nil
- }
- x := q.Head
- q.Head = x.Next
- x.Next = nil
- return x
- }
- // readyDescriptorQueue is a map of readyDescriptors keyed by ID.
- // It maintains a free list of deleted ready descriptors.
- type readyDescriptorMap struct {
- descriptors map[uint32]*readyDescriptor
- free []*readyDescriptor
- }
- func newReadyDescriptorMap() *readyDescriptorMap {
- return &readyDescriptorMap{descriptors: make(map[uint32]*readyDescriptor)}
- }
- // create or reuse a readyDescriptor if the stream is not in the queue.
- // This avoid stream starvation caused by a single high-bandwidth stream monopolising the writer goroutine
- func (m *readyDescriptorMap) SetIfMissing(key uint32) *readyDescriptor {
- if _, ok := m.descriptors[key]; ok {
- return nil
- }
- var newDescriptor *readyDescriptor
- if len(m.free) > 0 {
- // reuse deleted ready descriptors
- newDescriptor = m.free[len(m.free)-1]
- m.free = m.free[:len(m.free)-1]
- } else {
- newDescriptor = &readyDescriptor{}
- }
- newDescriptor.ID = key
- m.descriptors[key] = newDescriptor
- return newDescriptor
- }
- func (m *readyDescriptorMap) Delete(key uint32) {
- if descriptor, ok := m.descriptors[key]; ok {
- m.free = append(m.free, descriptor)
- delete(m.descriptors, key)
- }
- }
|