queuepacketconn.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package turbotunnel
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
  9. // return type of PacketConn.ReadFrom.
  10. type taggedPacket struct {
  11. P []byte
  12. Addr net.Addr
  13. }
  14. // QueuePacketConn implements net.PacketConn by storing queues of packets. There
  15. // is one incoming queue (where packets are additionally tagged by the source
  16. // address of the client that sent them). There are many outgoing queues, one
  17. // for each client address that has been recently seen. The QueueIncoming method
  18. // inserts a packet into the incoming queue, to eventually be returned by
  19. // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
  20. // which can later by accessed through the OutgoingQueue method.
  21. type QueuePacketConn struct {
  22. clients *ClientMap
  23. localAddr net.Addr
  24. recvQueue chan taggedPacket
  25. closeOnce sync.Once
  26. closed chan struct{}
  27. // What error to return when the QueuePacketConn is closed.
  28. err atomic.Value
  29. }
  30. // NewQueuePacketConn makes a new QueuePacketConn, set to track recent clients
  31. // for at least a duration of timeout.
  32. func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
  33. return &QueuePacketConn{
  34. clients: NewClientMap(timeout),
  35. localAddr: localAddr,
  36. recvQueue: make(chan taggedPacket, queueSize),
  37. closed: make(chan struct{}),
  38. }
  39. }
  40. // QueueIncoming queues and incoming packet and its source address, to be
  41. // returned in a future call to ReadFrom.
  42. func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
  43. select {
  44. case <-c.closed:
  45. // If we're closed, silently drop it.
  46. return
  47. default:
  48. }
  49. // Copy the slice so that the caller may reuse it.
  50. buf := make([]byte, len(p))
  51. copy(buf, p)
  52. select {
  53. case c.recvQueue <- taggedPacket{buf, addr}:
  54. default:
  55. // Drop the incoming packet if the receive queue is full.
  56. }
  57. }
  58. // OutgoingQueue returns the queue of outgoing packets corresponding to addr,
  59. // creating it if necessary. The contents of the queue will be packets that are
  60. // written to the address in question using WriteTo.
  61. func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
  62. return c.clients.SendQueue(addr)
  63. }
  64. // ReadFrom returns a packet and address previously stored by QueueIncoming.
  65. func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
  66. select {
  67. case <-c.closed:
  68. return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
  69. default:
  70. }
  71. select {
  72. case <-c.closed:
  73. return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
  74. case packet := <-c.recvQueue:
  75. return copy(p, packet.P), packet.Addr, nil
  76. }
  77. }
  78. // WriteTo queues an outgoing packet for the given address. The queue can later
  79. // be retrieved using the OutgoingQueue method.
  80. func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
  81. select {
  82. case <-c.closed:
  83. return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
  84. default:
  85. }
  86. // Copy the slice so that the caller may reuse it.
  87. buf := make([]byte, len(p))
  88. copy(buf, p)
  89. select {
  90. case c.clients.SendQueue(addr) <- buf:
  91. return len(buf), nil
  92. default:
  93. // Drop the outgoing packet if the send queue is full.
  94. return len(buf), nil
  95. }
  96. }
  97. // closeWithError unblocks pending operations and makes future operations fail
  98. // with the given error. If err is nil, it becomes errClosedPacketConn.
  99. func (c *QueuePacketConn) closeWithError(err error) error {
  100. var newlyClosed bool
  101. c.closeOnce.Do(func() {
  102. newlyClosed = true
  103. // Store the error to be returned by future PacketConn
  104. // operations.
  105. if err == nil {
  106. err = errClosedPacketConn
  107. }
  108. c.err.Store(err)
  109. close(c.closed)
  110. })
  111. if !newlyClosed {
  112. return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
  113. }
  114. return nil
  115. }
  116. // Close unblocks pending operations and makes future operations fail with a
  117. // "closed connection" error.
  118. func (c *QueuePacketConn) Close() error {
  119. return c.closeWithError(nil)
  120. }
  121. // LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
  122. func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }
  123. func (c *QueuePacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
  124. func (c *QueuePacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
  125. func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }