123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package quic
- import (
- "errors"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/quic-go/quic-go"
- "github.com/rs/zerolog"
- "github.com/rs/zerolog/log"
- )
- // The error that is throw by the writer when there is `no network activity`.
- var idleTimeoutError = quic.IdleTimeoutError{}
- type SafeStreamCloser struct {
- lock sync.Mutex
- stream quic.Stream
- writeTimeout time.Duration
- log *zerolog.Logger
- closing atomic.Bool
- }
- func NewSafeStreamCloser(stream quic.Stream, writeTimeout time.Duration, log *zerolog.Logger) *SafeStreamCloser {
- return &SafeStreamCloser{
- stream: stream,
- writeTimeout: writeTimeout,
- log: log,
- }
- }
- func (s *SafeStreamCloser) Read(p []byte) (n int, err error) {
- return s.stream.Read(p)
- }
- func (s *SafeStreamCloser) Write(p []byte) (n int, err error) {
- s.lock.Lock()
- defer s.lock.Unlock()
- if s.writeTimeout > 0 {
- err = s.stream.SetWriteDeadline(time.Now().Add(s.writeTimeout))
- if err != nil {
- log.Err(err).Msg("Error setting write deadline for QUIC stream")
- }
- }
- nBytes, err := s.stream.Write(p)
- if err != nil {
- s.handleWriteError(err)
- }
- return nBytes, err
- }
- // Handles the timeout error in case it happened, by canceling the stream write.
- func (s *SafeStreamCloser) handleWriteError(err error) {
- // If we are closing the stream we just ignore any write error.
- if s.closing.Load() {
- return
- }
- var netErr net.Error
- if errors.As(err, &netErr) {
- if netErr.Timeout() {
- // We don't need to log if what cause the timeout was no network activity.
- if !errors.Is(netErr, &idleTimeoutError) {
- s.log.Error().Err(netErr).Msg("Closing quic stream due to timeout while writing")
- }
- // We need to explicitly cancel the write so that it frees all buffers.
- s.stream.CancelWrite(0)
- }
- }
- }
- func (s *SafeStreamCloser) Close() error {
- // Set this stream to a closing state.
- s.closing.Store(true)
- // Make sure a possible writer does not block the lock forever. We need it, so we can close the writer
- // side of the stream safely.
- _ = s.stream.SetWriteDeadline(time.Now())
- // This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes.
- s.lock.Lock()
- defer s.lock.Unlock()
- // We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that.
- s.stream.CancelRead(0)
- return s.stream.Close()
- }
- func (s *SafeStreamCloser) CloseWrite() error {
- s.lock.Lock()
- defer s.lock.Unlock()
- // As documented by the quic-go library, this doesn't actually close the entire stream.
- // It prevents further writes, which in turn will result in an EOF signal being sent the other side of stream when
- // reading.
- // We can still read from this stream.
- return s.stream.Close()
- }
- func (s *SafeStreamCloser) SetDeadline(deadline time.Time) error {
- return s.stream.SetDeadline(deadline)
- }
|