2 Commits 957d9af076 ... f790731790

Auteur SHA1 Bericht Datum
  Michael Buesch f790731790 Add support for UDP control port 2 weken geleden
  Michael Buesch 957d9af076 Add support for UDP control port 2 weken geleden
1 gewijzigde bestanden met toevoegingen van 37 en 34 verwijderingen
  1. 37 34
      letmein-proto/src/socket.rs

+ 37 - 34
letmein-proto/src/socket.rs

@@ -70,7 +70,12 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
     }
 
     /// Try to receive a new datagram from the socket.
-    fn try_recv(&mut self, socket: &UdpSocket) -> ah::Result<()> {
+    fn try_recv(
+        &mut self,
+        socket: &UdpSocket,
+        accept_notify: &Sender<()>,
+        recv_notify: &Sender<()>,
+    ) -> ah::Result<()> {
         let mut buf = [0_u8; MSG_SIZE];
         match socket.try_recv_from(&mut buf) {
             Ok((n, peer_addr)) => {
@@ -94,6 +99,7 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
                     return Err(err!("UDP socket read: RX queue overflow (max={}).", Q_SIZE));
                 }
                 conn.rx_queue.push_back(buf);
+                let accepted = conn.accepted;
 
                 // Check if we exceeded the maximum number of connections.
                 if self.conn.len() > self.max_nr_conn {
@@ -103,6 +109,15 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
                         self.max_nr_conn
                     ));
                 }
+
+                if accepted {
+                    // There is queued RX data for an accepted connection. Wake watcher.
+                    let _ = recv_notify.send(());
+                } else {
+                    // There is an un-accepted connection. Wake watcher.
+                    let _ = accept_notify.send(());
+                }
+
                 Ok(())
             }
             Err(e) if e.kind() == ErrorKind::WouldBlock => Ok(()),
@@ -111,8 +126,13 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
     }
 
     /// Get the first not-accepted connection, or None.
-    fn try_accept(&mut self, socket: &UdpSocket) -> ah::Result<Option<SocketAddr>> {
-        self.try_recv(socket)?;
+    fn try_accept(
+        &mut self,
+        socket: &UdpSocket,
+        accept_notify: &Sender<()>,
+        recv_notify: &Sender<()>,
+    ) -> ah::Result<Option<SocketAddr>> {
+        self.try_recv(socket, accept_notify, recv_notify)?;
         for conn in &mut self.conn.values_mut() {
             if !conn.accepted {
                 conn.accepted = true;
@@ -127,8 +147,10 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
         &mut self,
         socket: &UdpSocket,
         peer_addr: SocketAddr,
+        accept_notify: &Sender<()>,
+        recv_notify: &Sender<()>,
     ) -> ah::Result<Option<[u8; MSG_SIZE]>> {
-        self.try_recv(socket)?;
+        self.try_recv(socket, accept_notify, recv_notify)?;
         if let Some(conn) = self.conn.get_mut(&peer_addr) {
             Ok(conn.rx_queue.pop_front())
         } else {
@@ -140,26 +162,6 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcherRx<MSG_SIZE, Q_SIZ
     fn disconnect(&mut self, peer_addr: SocketAddr) {
         self.conn.remove(&peer_addr);
     }
-
-    fn wake_watchers(&self, accept: &Sender<()>, recv: &Sender<()>) {
-        let mut accept_notified = false;
-        let mut recv_notified = false;
-        for conn in self.conn.values() {
-            if !accept_notified && !conn.accepted {
-                // There is an un-accepted connection. Wake watcher.
-                let _ = accept.send(());
-                accept_notified = true;
-            }
-            if !recv_notified && conn.accepted && !conn.rx_queue.is_empty() {
-                // There is queued RX data. Wake watcher.
-                let _ = recv.send(());
-                recv_notified = true;
-            }
-            if accept_notified && recv_notified {
-                break;
-            }
-        }
-    }
 }
 
 /// Simple TX/RX dispatcher for UDP.
@@ -201,11 +203,11 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcher<MSG_SIZE, Q_SIZE>
                 _ = self.accept_watch.1.lock() => (),
             }
 
-            let mut rx = self.rx.lock().await;
-
-            let peer_addr = rx.try_accept(&self.socket)?;
-            rx.wake_watchers(&self.accept_watch.0, &self.recv_watch.0);
-            if let Some(peer_addr) = peer_addr {
+            if let Some(peer_addr) = self.rx.lock().await.try_accept(
+                &self.socket,
+                &self.accept_watch.0,
+                &self.recv_watch.0,
+            )? {
                 break Ok(peer_addr);
             }
         }
@@ -220,11 +222,12 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> UdpDispatcher<MSG_SIZE, Q_SIZE>
                 _ = self.recv_watch.1.lock() => (),
             }
 
-            let mut rx = self.rx.lock().await;
-
-            let buf = rx.try_recv_from(&self.socket, peer_addr)?;
-            rx.wake_watchers(&self.accept_watch.0, &self.recv_watch.0);
-            if let Some(buf) = buf {
+            if let Some(buf) = self.rx.lock().await.try_recv_from(
+                &self.socket,
+                peer_addr,
+                &self.accept_watch.0,
+                &self.recv_watch.0,
+            )? {
                 break Ok(buf);
             }
         }