|
@@ -70,7 +70,12 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
}
|
|
|
|
|
|
/// Try to receive a new datagram from the socket.
|
|
|
- fn try_recv(&mut self, socket: &UdpSocket) -> ah::Result<()> {
|
|
|
+ fn try_recv(
|
|
|
+ &mut self,
|
|
|
+ socket: &UdpSocket,
|
|
|
+ accept_notify: &Sender<()>,
|
|
|
+ recv_notify: &Sender<()>,
|
|
|
+ ) -> ah::Result<()> {
|
|
|
let mut buf = [0_u8; MSG_SIZE];
|
|
|
match socket.try_recv_from(&mut buf) {
|
|
|
Ok((n, peer_addr)) => {
|
|
@@ -94,6 +99,7 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
return Err(err!("UDP socket read: RX queue overflow (max={}).", Q_SIZE));
|
|
|
}
|
|
|
conn.rx_queue.push_back(buf);
|
|
|
+ let accepted = conn.accepted;
|
|
|
|
|
|
// Check if we exceeded the maximum number of connections.
|
|
|
if self.conn.len() > self.max_nr_conn {
|
|
@@ -103,6 +109,15 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
self.max_nr_conn
|
|
|
));
|
|
|
}
|
|
|
+
|
|
|
+ if accepted {
|
|
|
+ // There is queued RX data for an accepted connection. Wake watcher.
|
|
|
+ let _ = recv_notify.send(());
|
|
|
+ } else {
|
|
|
+ // There is an un-accepted connection. Wake watcher.
|
|
|
+ let _ = accept_notify.send(());
|
|
|
+ }
|
|
|
+
|
|
|
Ok(())
|
|
|
}
|
|
|
Err(e) if e.kind() == ErrorKind::WouldBlock => Ok(()),
|
|
@@ -111,8 +126,13 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
}
|
|
|
|
|
|
/// Get the first not-accepted connection, or None.
|
|
|
- fn try_accept(&mut self, socket: &UdpSocket) -> ah::Result<Option<SocketAddr>> {
|
|
|
- self.try_recv(socket)?;
|
|
|
+ fn try_accept(
|
|
|
+ &mut self,
|
|
|
+ socket: &UdpSocket,
|
|
|
+ accept_notify: &Sender<()>,
|
|
|
+ recv_notify: &Sender<()>,
|
|
|
+ ) -> ah::Result<Option<SocketAddr>> {
|
|
|
+ self.try_recv(socket, accept_notify, recv_notify)?;
|
|
|
for conn in &mut self.conn.values_mut() {
|
|
|
if !conn.accepted {
|
|
|
conn.accepted = true;
|
|
@@ -127,8 +147,10 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
&mut self,
|
|
|
socket: &UdpSocket,
|
|
|
peer_addr: SocketAddr,
|
|
|
+ accept_notify: &Sender<()>,
|
|
|
+ recv_notify: &Sender<()>,
|
|
|
) -> ah::Result<Option<[u8; MSG_SIZE]>> {
|
|
|
- self.try_recv(socket)?;
|
|
|
+ self.try_recv(socket, accept_notify, recv_notify)?;
|
|
|
if let Some(conn) = self.conn.get_mut(&peer_addr) {
|
|
|
Ok(conn.rx_queue.pop_front())
|
|
|
} else {
|
|
@@ -140,26 +162,6 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
|
|
|
fn disconnect(&mut self, peer_addr: SocketAddr) {
|
|
|
self.conn.remove(&peer_addr);
|
|
|
}
|
|
|
-
|
|
|
- fn wake_watchers(&self, accept: &Sender<()>, recv: &Sender<()>) {
|
|
|
- let mut accept_notified = false;
|
|
|
- let mut recv_notified = false;
|
|
|
- for conn in self.conn.values() {
|
|
|
- if !accept_notified && !conn.accepted {
|
|
|
- // There is an un-accepted connection. Wake watcher.
|
|
|
- let _ = accept.send(());
|
|
|
- accept_notified = true;
|
|
|
- }
|
|
|
- if !recv_notified && conn.accepted && !conn.rx_queue.is_empty() {
|
|
|
- // There is queued RX data. Wake watcher.
|
|
|
- let _ = recv.send(());
|
|
|
- recv_notified = true;
|
|
|
- }
|
|
|
- if accept_notified && recv_notified {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/// Simple TX/RX dispatcher for UDP.
|
|
@@ -201,11 +203,11 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcher<MSG_SIZE, Q_SIZE>
|
|
|
_ = self.accept_watch.1.lock() => (),
|
|
|
}
|
|
|
|
|
|
- let mut rx = self.rx.lock().await;
|
|
|
-
|
|
|
- let peer_addr = rx.try_accept(&self.socket)?;
|
|
|
- rx.wake_watchers(&self.accept_watch.0, &self.recv_watch.0);
|
|
|
- if let Some(peer_addr) = peer_addr {
|
|
|
+ if let Some(peer_addr) = self.rx.lock().await.try_accept(
|
|
|
+ &self.socket,
|
|
|
+ &self.accept_watch.0,
|
|
|
+ &self.recv_watch.0,
|
|
|
+ )? {
|
|
|
break Ok(peer_addr);
|
|
|
}
|
|
|
}
|
|
@@ -220,11 +222,12 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcher<MSG_SIZE, Q_SIZE>
|
|
|
_ = self.recv_watch.1.lock() => (),
|
|
|
}
|
|
|
|
|
|
- let mut rx = self.rx.lock().await;
|
|
|
-
|
|
|
- let buf = rx.try_recv_from(&self.socket, peer_addr)?;
|
|
|
- rx.wake_watchers(&self.accept_watch.0, &self.recv_watch.0);
|
|
|
- if let Some(buf) = buf {
|
|
|
+ if let Some(buf) = self.rx.lock().await.try_recv_from(
|
|
|
+ &self.socket,
|
|
|
+ peer_addr,
|
|
|
+ &self.accept_watch.0,
|
|
|
+ &self.recv_watch.0,
|
|
|
+ )? {
|
|
|
break Ok(buf);
|
|
|
}
|
|
|
}
|