diff options
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | src/client.rs | 45 | ||||
-rw-r--r-- | src/server.rs | 57 |
3 files changed, 88 insertions, 16 deletions
@@ -70,7 +70,7 @@ This is a bird-eye view of the features/tasks that will probably be worked on ne - [x] [Previous roadmap tasks](ROADMAP.md) - [x] Security: More certificates support, see [certificates-and-server-authentication](#certificates-and-server-authentication) - [x] Feature: Implement `unordered` & `ordered` `reliable` message channels on client & server, see [channels](#channels) -- [ ] Feature: Implement `unreliable` message channel on client & server +- [x] Feature: Implement `unreliable` message channel on client & server - [ ] Feature: Implement `unreliable` messages larger than the path MTU from client & server - [ ] Performance: feed multiples messages before flushing ordered reliable channels - [ ] Clean: Rework the error handling in the async back-end diff --git a/src/client.rs b/src/client.rs index 27fc546..8a52441 100644 --- a/src/client.rs +++ b/src/client.rs @@ -624,17 +624,24 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { { let close_recv = spawn_config.close_recv.resubscribe(); let connection_handle = connection.clone(); + let bytes_incoming_send = spawn_config.bytes_from_server_send.clone(); tokio::spawn(async move { - reliable_receiver_task( - connection_handle, - spawn_config.bytes_from_server_send, - close_recv, - ) - .await + reliable_receiver_task(connection_handle, close_recv, bytes_incoming_send).await }); } - // Spawn a task to handle channels for this connection + // Spawn a task to listen for datagrams sent by the server + { + let close_recv = spawn_config.close_recv.resubscribe(); + let connection_handle = connection.clone(); + let bytes_incoming_send = spawn_config.bytes_from_server_send.clone(); + tokio::spawn(async move { + unreliable_receiver_task(connection_handle, close_recv, bytes_incoming_send) + .await + }); + } + + // Spawn a task to handle send channels for this connection tokio::spawn(async move { channels_task( connection, @@ -669,8 +676,8 @@ async fn uni_receiver_task( async fn reliable_receiver_task( connection: quinn::Connection, - incoming_bytes_send: mpsc::Sender<Bytes>, mut close_recv: broadcast::Receiver<()>, + bytes_incoming_send: mpsc::Sender<Bytes>, ) { let close_recv_clone = close_recv.resubscribe(); tokio::select! { @@ -679,7 +686,7 @@ async fn reliable_receiver_task( } _ = async { while let Ok(recv) = connection.accept_uni().await { - let bytes_from_server_send = incoming_bytes_send.clone(); + let bytes_from_server_send = bytes_incoming_send.clone(); let close_recv_clone = close_recv_clone.resubscribe(); tokio::spawn(async move { uni_receiver_task( @@ -696,6 +703,26 @@ async fn reliable_receiver_task( trace!("All unidirectional stream receivers cleaned"); } +async fn unreliable_receiver_task( + connection: quinn::Connection, + mut close_recv: broadcast::Receiver<()>, + bytes_incoming_send: mpsc::Sender<Bytes>, +) { + tokio::select! { + _ = close_recv.recv() => { + trace!("Listener for unreliable datagrams received a close signal") + } + _ = async { + while let Ok(msg_bytes) = connection.read_datagram().await { + // TODO Clean: error handling + bytes_incoming_send.send(msg_bytes.into()).await.unwrap(); + } + } => { + trace!("Listener for unreliable datagrams ended") + } + }; +} + // Receive messages from the async client tasks and update the sync client. fn update_sync_client( mut connection_events: EventWriter<ConnectionEvent>, diff --git a/src/server.rs b/src/server.rs index 80f0dec..0ac916b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -777,20 +777,37 @@ async fn client_connection_task( // Spawn a task to listen for streams opened by this client { - let conn = connection.clone(); + let connection_handle = connection.clone(); + let client_close_recv = client_close_recv.resubscribe(); + let payloads_incoming_send = payloads_from_clients_send.clone(); + tokio::spawn(async move { + reliable_receiver_task( + client_id, + connection_handle, + client_close_recv, + payloads_incoming_send, + ) + .await + }); + } + + // Spawn a task to listen for datagrams sent by this client + { + let connection_handle = connection.clone(); let client_close_recv = client_close_recv.resubscribe(); + let payloads_incoming_send = payloads_from_clients_send.clone(); tokio::spawn(async move { - client_receiver_task( + unreliable_receiver_task( client_id, - conn, + connection_handle, client_close_recv, - payloads_from_clients_send, + payloads_incoming_send, ) .await }); } - // Spawn a task to handle channels for this client + // Spawn a task to handle send channels for this client tokio::spawn(async move { channels_task( connection, @@ -828,7 +845,7 @@ async fn uni_receiver_task( }; } -async fn client_receiver_task( +async fn reliable_receiver_task( client_id: ClientId, connection: quinn::Connection, mut close_recv: tokio::sync::broadcast::Receiver<()>, @@ -862,6 +879,34 @@ async fn client_receiver_task( ) } +async fn unreliable_receiver_task( + client_id: ClientId, + connection: quinn::Connection, + mut close_recv: broadcast::Receiver<()>, + payloads_incoming_send: mpsc::Sender<ClientPayload>, +) { + tokio::select! { + _ = close_recv.recv() => { + trace!("Listener for unreliable datagrams received a close signal for client: {}", + client_id) + } + _ = async { + while let Ok(msg_bytes) = connection.read_datagram().await { + // TODO Clean: error handling + payloads_incoming_send.send(ClientPayload { + client_id: client_id, + msg: msg_bytes.into(), + }) + .await + .unwrap(); + } + } => { + trace!("Listener for unreliable datagrams ended for client: {}", + client_id) + } + }; +} + fn create_server(mut commands: Commands, runtime: Res<AsyncRuntime>) { commands.insert_resource(Server { endpoint: None, |