aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md2
-rw-r--r--src/client.rs45
-rw-r--r--src/server.rs57
3 files changed, 88 insertions, 16 deletions
diff --git a/README.md b/README.md
index 67bd34c..0032747 100644
--- a/README.md
+++ b/README.md
@@ -70,7 +70,7 @@ This is a bird-eye view of the features/tasks that will probably be worked on ne
- [x] [Previous roadmap tasks](ROADMAP.md)
- [x] Security: More certificates support, see [certificates-and-server-authentication](#certificates-and-server-authentication)
- [x] Feature: Implement `unordered` & `ordered` `reliable` message channels on client & server, see [channels](#channels)
-- [ ] Feature: Implement `unreliable` message channel on client & server
+- [x] Feature: Implement `unreliable` message channel on client & server
- [ ] Feature: Implement `unreliable` messages larger than the path MTU from client & server
- [ ] Performance: feed multiples messages before flushing ordered reliable channels
- [ ] Clean: Rework the error handling in the async back-end
diff --git a/src/client.rs b/src/client.rs
index 27fc546..8a52441 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -624,17 +624,24 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
{
let close_recv = spawn_config.close_recv.resubscribe();
let connection_handle = connection.clone();
+ let bytes_incoming_send = spawn_config.bytes_from_server_send.clone();
tokio::spawn(async move {
- reliable_receiver_task(
- connection_handle,
- spawn_config.bytes_from_server_send,
- close_recv,
- )
- .await
+ reliable_receiver_task(connection_handle, close_recv, bytes_incoming_send).await
});
}
- // Spawn a task to handle channels for this connection
+ // Spawn a task to listen for datagrams sent by the server
+ {
+ let close_recv = spawn_config.close_recv.resubscribe();
+ let connection_handle = connection.clone();
+ let bytes_incoming_send = spawn_config.bytes_from_server_send.clone();
+ tokio::spawn(async move {
+ unreliable_receiver_task(connection_handle, close_recv, bytes_incoming_send)
+ .await
+ });
+ }
+
+ // Spawn a task to handle send channels for this connection
tokio::spawn(async move {
channels_task(
connection,
@@ -669,8 +676,8 @@ async fn uni_receiver_task(
async fn reliable_receiver_task(
connection: quinn::Connection,
- incoming_bytes_send: mpsc::Sender<Bytes>,
mut close_recv: broadcast::Receiver<()>,
+ bytes_incoming_send: mpsc::Sender<Bytes>,
) {
let close_recv_clone = close_recv.resubscribe();
tokio::select! {
@@ -679,7 +686,7 @@ async fn reliable_receiver_task(
}
_ = async {
while let Ok(recv) = connection.accept_uni().await {
- let bytes_from_server_send = incoming_bytes_send.clone();
+ let bytes_from_server_send = bytes_incoming_send.clone();
let close_recv_clone = close_recv_clone.resubscribe();
tokio::spawn(async move {
uni_receiver_task(
@@ -696,6 +703,26 @@ async fn reliable_receiver_task(
trace!("All unidirectional stream receivers cleaned");
}
+async fn unreliable_receiver_task(
+ connection: quinn::Connection,
+ mut close_recv: broadcast::Receiver<()>,
+ bytes_incoming_send: mpsc::Sender<Bytes>,
+) {
+ tokio::select! {
+ _ = close_recv.recv() => {
+ trace!("Listener for unreliable datagrams received a close signal")
+ }
+ _ = async {
+ while let Ok(msg_bytes) = connection.read_datagram().await {
+ // TODO Clean: error handling
+ bytes_incoming_send.send(msg_bytes.into()).await.unwrap();
+ }
+ } => {
+ trace!("Listener for unreliable datagrams ended")
+ }
+ };
+}
+
// Receive messages from the async client tasks and update the sync client.
fn update_sync_client(
mut connection_events: EventWriter<ConnectionEvent>,
diff --git a/src/server.rs b/src/server.rs
index 80f0dec..0ac916b 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -777,20 +777,37 @@ async fn client_connection_task(
// Spawn a task to listen for streams opened by this client
{
- let conn = connection.clone();
+ let connection_handle = connection.clone();
+ let client_close_recv = client_close_recv.resubscribe();
+ let payloads_incoming_send = payloads_from_clients_send.clone();
+ tokio::spawn(async move {
+ reliable_receiver_task(
+ client_id,
+ connection_handle,
+ client_close_recv,
+ payloads_incoming_send,
+ )
+ .await
+ });
+ }
+
+ // Spawn a task to listen for datagrams sent by this client
+ {
+ let connection_handle = connection.clone();
let client_close_recv = client_close_recv.resubscribe();
+ let payloads_incoming_send = payloads_from_clients_send.clone();
tokio::spawn(async move {
- client_receiver_task(
+ unreliable_receiver_task(
client_id,
- conn,
+ connection_handle,
client_close_recv,
- payloads_from_clients_send,
+ payloads_incoming_send,
)
.await
});
}
- // Spawn a task to handle channels for this client
+ // Spawn a task to handle send channels for this client
tokio::spawn(async move {
channels_task(
connection,
@@ -828,7 +845,7 @@ async fn uni_receiver_task(
};
}
-async fn client_receiver_task(
+async fn reliable_receiver_task(
client_id: ClientId,
connection: quinn::Connection,
mut close_recv: tokio::sync::broadcast::Receiver<()>,
@@ -862,6 +879,34 @@ async fn client_receiver_task(
)
}
+async fn unreliable_receiver_task(
+ client_id: ClientId,
+ connection: quinn::Connection,
+ mut close_recv: broadcast::Receiver<()>,
+ payloads_incoming_send: mpsc::Sender<ClientPayload>,
+) {
+ tokio::select! {
+ _ = close_recv.recv() => {
+ trace!("Listener for unreliable datagrams received a close signal for client: {}",
+ client_id)
+ }
+ _ = async {
+ while let Ok(msg_bytes) = connection.read_datagram().await {
+ // TODO Clean: error handling
+ payloads_incoming_send.send(ClientPayload {
+ client_id: client_id,
+ msg: msg_bytes.into(),
+ })
+ .await
+ .unwrap();
+ }
+ } => {
+ trace!("Listener for unreliable datagrams ended for client: {}",
+ client_id)
+ }
+ };
+}
+
fn create_server(mut commands: Commands, runtime: Res<AsyncRuntime>) {
commands.insert_resource(Server {
endpoint: None,