4 Ревизии a22e534abe ... 09086a48cb

Автор SHA1 Съобщение Дата
  Michael Buesch 09086a48cb Add support for UDP control port преди 2 седмици
  Michael Buesch ea41d587a6 Bump version преди 1 седмица
  Michael Buesch 0202e4bbc0 systemd: Add UDP support преди 1 седмица
  Michael Buesch a22e534abe Add support for UDP control port преди 2 седмици
променени са 5 файла, в които са добавени 40 реда и са изтрити 26 реда
  1. 8 8
      Cargo.lock
  2. 6 6
      Cargo.toml
  3. 7 1
      letmein-proto/src/socket.rs
  4. 1 0
      letmeind/src/main.rs
  5. 18 11
      letmeind/src/server.rs

+ 8 - 8
Cargo.lock

@@ -534,7 +534,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein"
 name = "letmein"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "build-target",
  "build-target",
@@ -548,7 +548,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein-conf"
 name = "letmein-conf"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "letmein-proto",
  "letmein-proto",
@@ -556,7 +556,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein-fwproto"
 name = "letmein-fwproto"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "tokio",
  "tokio",
@@ -564,7 +564,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein-proto"
 name = "letmein-proto"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "getrandom",
  "getrandom",
@@ -576,7 +576,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein-seccomp"
 name = "letmein-seccomp"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "letmein-seccomp",
  "letmein-seccomp",
@@ -586,7 +586,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmein-systemd"
 name = "letmein-systemd"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "libc",
  "libc",
@@ -595,7 +595,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmeind"
 name = "letmeind"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "build-target",
  "build-target",
@@ -611,7 +611,7 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "letmeinfwd"
 name = "letmeinfwd"
-version = "7.2.0"
+version = "8.0.0"
 dependencies = [
 dependencies = [
  "anyhow",
  "anyhow",
  "build-target",
  "build-target",

+ 6 - 6
Cargo.toml

@@ -14,7 +14,7 @@ members = [
 resolver = "2"
 resolver = "2"
 
 
 [workspace.package]
 [workspace.package]
-version = "7.2.0"
+version = "8.0.0"
 edition = "2021"
 edition = "2021"
 rust-version = "1.75.0"
 rust-version = "1.75.0"
 license = "MIT OR Apache-2.0"
 license = "MIT OR Apache-2.0"
@@ -40,11 +40,11 @@ sha3 = "0.10"
 subtle = "2"
 subtle = "2"
 tokio = "1"
 tokio = "1"
 
 
-letmein-conf = { version = "7", path = "./letmein-conf" }
-letmein-fwproto = { version = "7", path = "./letmein-fwproto" }
-letmein-proto = { version = "7", path = "./letmein-proto" }
-letmein-seccomp = { version = "7", path = "./letmein-seccomp", default-features = false }
-letmein-systemd = { version = "7", path = "./letmein-systemd" }
+letmein-conf = { version = "8", path = "./letmein-conf" }
+letmein-fwproto = { version = "8", path = "./letmein-fwproto" }
+letmein-proto = { version = "8", path = "./letmein-proto" }
+letmein-seccomp = { version = "8", path = "./letmein-seccomp", default-features = false }
+letmein-systemd = { version = "8", path = "./letmein-systemd" }
 
 
 [profile.release]
 [profile.release]
 opt-level = "z"
 opt-level = "z"

+ 7 - 1
letmein-proto/src/socket.rs

@@ -209,6 +209,7 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> NetSocket<MSG_SIZE, Q_SIZE> {
         } else {
         } else {
             match self {
             match self {
                 Self::Tcp(inner) => {
                 Self::Tcp(inner) => {
+                    // Send the message via TCP.
                     let mut count = 0;
                     let mut count = 0;
                     loop {
                     loop {
                         inner
                         inner
@@ -232,6 +233,7 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> NetSocket<MSG_SIZE, Q_SIZE> {
                     }
                     }
                 }
                 }
                 Self::Udp(inner) => {
                 Self::Udp(inner) => {
+                    // Send the message via UDP.
                     inner
                     inner
                         .disp
                         .disp
                         .send_to(
                         .send_to(
@@ -252,6 +254,7 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> NetSocket<MSG_SIZE, Q_SIZE> {
         } else {
         } else {
             match self {
             match self {
                 Self::Tcp(inner) => {
                 Self::Tcp(inner) => {
+                    // Receive a message via TCP.
                     let mut buf = [0; MSG_SIZE];
                     let mut buf = [0; MSG_SIZE];
                     let mut count = 0;
                     let mut count = 0;
                     loop {
                     loop {
@@ -278,7 +281,10 @@ impl<const MSG_SIZE: usize, const Q_SIZE: usize> NetSocket<MSG_SIZE, Q_SIZE> {
                         }
                         }
                     }
                     }
                 }
                 }
-                Self::Udp(inner) => inner.disp.recv_from(inner.peer_addr).await.map(Some),
+                Self::Udp(inner) => {
+                    // Receive a message via UDP.
+                    inner.disp.recv_from(inner.peer_addr).await.map(Some)
+                }
             }
             }
         }
         }
     }
     }

+ 1 - 0
letmeind/src/main.rs

@@ -221,6 +221,7 @@ async fn async_main(opts: Arc<Opts>) -> ah::Result<()> {
                 match srv.accept().await {
                 match srv.accept().await {
                     Ok(conn) => {
                     Ok(conn) => {
                         // Socket connection handler.
                         // Socket connection handler.
+                        let conn = Arc::new(conn);
                         if let Ok(_permit) = conn_semaphore.acquire().await {
                         if let Ok(_permit) = conn_semaphore.acquire().await {
                             let conn = Arc::clone(&conn);
                             let conn = Arc::clone(&conn);
                             task::spawn(async move {
                             task::spawn(async move {

+ 18 - 11
letmeind/src/server.rs

@@ -10,7 +10,12 @@ use anyhow::{self as ah, format_err as err, Context as _};
 use letmein_conf::Config;
 use letmein_conf::Config;
 use letmein_proto::{Message, MsgNetSocket, MsgUdpDispatcher};
 use letmein_proto::{Message, MsgNetSocket, MsgUdpDispatcher};
 use letmein_systemd::{systemd_notify_ready, SystemdSocket};
 use letmein_systemd::{systemd_notify_ready, SystemdSocket};
-use std::{convert::Infallible, net::SocketAddr, sync::Arc, time::Duration};
+use std::{
+    convert::Infallible,
+    net::{Ipv6Addr, SocketAddr},
+    sync::Arc,
+    time::Duration,
+};
 use tokio::{
 use tokio::{
     net::{TcpListener, TcpStream, UdpSocket},
     net::{TcpListener, TcpStream, UdpSocket},
     task::{self, JoinHandle},
     task::{self, JoinHandle},
@@ -138,7 +143,7 @@ impl Server {
         // TCP bind.
         // TCP bind.
         if conf.port().tcp {
         if conf.port().tcp {
             this.tcp = Arc::new(Some(
             this.tcp = Arc::new(Some(
-                TcpListener::bind(("::0", conf.port().port))
+                TcpListener::bind((Ipv6Addr::UNSPECIFIED, conf.port().port))
                     .await
                     .await
                     .context("Bind")?,
                     .context("Bind")?,
             ));
             ));
@@ -146,7 +151,7 @@ impl Server {
         // UDP bind.
         // UDP bind.
         if conf.port().udp {
         if conf.port().udp {
             this.udp = Arc::new(Some(Arc::new(MsgUdpDispatcher::new(
             this.udp = Arc::new(Some(Arc::new(MsgUdpDispatcher::new(
-                UdpSocket::bind(("::0", conf.port().port))
+                UdpSocket::bind((Ipv6Addr::UNSPECIFIED, conf.port().port))
                     .await
                     .await
                     .context("Bind")?,
                     .context("Bind")?,
                 max_nr_udp_conn,
                 max_nr_udp_conn,
@@ -156,7 +161,8 @@ impl Server {
         Ok(this)
         Ok(this)
     }
     }
 
 
-    pub async fn accept(&mut self) -> ah::Result<Arc<Connection>> {
+    pub async fn accept(&mut self) -> ah::Result<Connection> {
+        // Async task for accepting a new TCP connection.
         let join_tcp: JoinHandle<ah::Result<(TcpStream, SocketAddr)>> = task::spawn({
         let join_tcp: JoinHandle<ah::Result<(TcpStream, SocketAddr)>> = task::spawn({
             let tcp = Arc::clone(&self.tcp);
             let tcp = Arc::clone(&self.tcp);
             async move {
             async move {
@@ -168,28 +174,29 @@ impl Server {
             }
             }
         });
         });
 
 
-        let join_udp: JoinHandle<ah::Result<SocketAddr>> = task::spawn({
+        // Async task for accepting a new UDP connection.
+        let join_udp: JoinHandle<ah::Result<(Arc<MsgUdpDispatcher>, SocketAddr)>> = task::spawn({
             let udp = Arc::clone(&self.udp);
             let udp = Arc::clone(&self.udp);
             async move {
             async move {
                 if let Some(udp) = udp.as_ref() {
                 if let Some(udp) = udp.as_ref() {
-                    return udp.accept().await;
+                    return Ok((Arc::clone(udp), udp.accept().await?));
                 }
                 }
                 sleep_forever().await;
                 sleep_forever().await;
                 unreachable!();
                 unreachable!();
             }
             }
         });
         });
 
 
+        // Await any one of the accept tasks.
         tokio::select! {
         tokio::select! {
             result = join_tcp => {
             result = join_tcp => {
                 let (stream, peer_addr) = result??;
                 let (stream, peer_addr) = result??;
                 let ns = MsgNetSocket::from_tcp(stream);
                 let ns = MsgNetSocket::from_tcp(stream);
-                Ok(Arc::new(Connection::new(ns, peer_addr)?))
+                Ok(Connection::new(ns, peer_addr)?)
             }
             }
             result = join_udp => {
             result = join_udp => {
-                let peer_addr = result??;
-                let udp_disp = (*self.udp).as_ref().unwrap();
-                let ns = MsgNetSocket::from_udp(Arc::clone(udp_disp), peer_addr);
-                Ok(Arc::new(Connection::new(ns, peer_addr)?))
+                let (udp_disp, peer_addr) = result??;
+                let ns = MsgNetSocket::from_udp(udp_disp, peer_addr);
+                Ok(Connection::new(ns, peer_addr)?)
             }
             }
         }
         }
     }
     }