diff options
author | gilles henaux <gill.henaux@gmail.com> | 2023-01-14 16:04:59 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-14 16:04:59 +0100 |
commit | f11a0360a273ba7af83e92db332c5b9be2854d29 (patch) | |
tree | bdfd1a457830b6d1c4251ad2879d35bb756ccf88 | |
parent | 4e64c1812b90d6cce07abb202439154fe9aa31a3 (diff) |
[client] Use the shared channels_task
-rw-r--r-- | src/client.rs | 110 |
1 files changed, 25 insertions, 85 deletions
diff --git a/src/client.rs b/src/client.rs index 5019630..1b3bd15 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,7 +11,7 @@ use std::{ use bevy::prelude::*; use bytes::Bytes; use futures_util::StreamExt; -use quinn::{ClientConfig, Connection as QuinnConnection, ConnectionError, Endpoint, VarInt}; +use quinn::{ClientConfig, Connection as QuinnConnection, ConnectionError, Endpoint}; use quinn_proto::ConnectionStats; use serde::Deserialize; use tokio::{ @@ -30,8 +30,8 @@ use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use crate::shared::{ channel::{ - get_channel_id_from_type, ordered_reliable_channel_task, unordered_reliable_channel_task, - Channel, ChannelAsyncMessage, ChannelId, ChannelSyncMessage, ChannelType, MultiChannelId, + channels_task, get_channel_id_from_type, Channel, ChannelAsyncMessage, ChannelId, + ChannelSyncMessage, ChannelType, MultiChannelId, }, AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, }; @@ -581,7 +581,8 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { .await .expect("Failed to signal connection to sync client"); - let _close_waiter = { + // Spawn a task to listen for the underlying connection being closed + { let conn = connection.clone(); let to_sync_client = spawn_config.to_sync_client_send.clone(); tokio::spawn(async move { @@ -594,24 +595,30 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { }) }; - let close_recv = spawn_config.close_recv.resubscribe(); - let connection_handle = connection.clone(); + // Spawn a task to listen for streams opened by the server + { + let close_recv = spawn_config.close_recv.resubscribe(); + let connection_handle = connection.clone(); + tokio::spawn(async move { + connection_receiving_task( + connection_handle, + spawn_config.bytes_from_server_send, + close_recv, + ) + .await + }); + } + + // Spawn a task to handle channels for this connection tokio::spawn(async move { - connection_receiving_task( - connection_handle, - spawn_config.bytes_from_server_send, - close_recv, + channels_task( + connection, + spawn_config.close_recv, + spawn_config.to_channels_recv, + spawn_config.from_channels_send, ) .await }); - - handle_connection_channels( - connection, - spawn_config.close_recv, - spawn_config.to_channels_recv, - spawn_config.from_channels_send, - ) - .await; } } } @@ -645,73 +652,6 @@ async fn connection_receiving_task( trace!("All unidirectional stream receivers cleaned"); } -async fn handle_connection_channels( - connection: quinn::Connection, - mut close_recv: broadcast::Receiver<()>, - mut to_channels_recv: mpsc::Receiver<ChannelSyncMessage>, - from_channels_send: mpsc::Sender<ChannelAsyncMessage>, -) { - // Use an mpsc channel where, instead of sending messages, we wait for the channel to be closed, which happens when every sender has been dropped. We can't use a JoinSet as simply here since we would also need to drain closed channels from it. - let (channel_tasks_keepalive, mut channel_tasks_waiter) = mpsc::channel(1); - - let close_receiver_clone = close_recv.resubscribe(); - tokio::select! { - _ = close_recv.recv() => { - trace!("Connection Channels listener received a close signal") - } - _ = async { - while let Some(sync_message) = to_channels_recv.recv().await { - let ChannelSyncMessage::CreateChannel{ channel_id, bytes_to_channel_recv, channel_close_recv } = sync_message; - - let close_receiver = close_receiver_clone.resubscribe(); - let connection_handle = connection.clone(); - let from_channels_send = from_channels_send.clone(); - let channels_keepalive_clone = channel_tasks_keepalive.clone(); - - match channel_id { - ChannelId::OrderedReliable(_) => { - tokio::spawn(async move { - ordered_reliable_channel_task( - connection_handle, - channels_keepalive_clone, - from_channels_send, - close_receiver, - channel_close_recv, - bytes_to_channel_recv - ) - .await - }); - }, - ChannelId::UnorderedReliable => { - tokio::spawn(async move { - unordered_reliable_channel_task( - connection_handle, - channels_keepalive_clone, - from_channels_send, - close_receiver, - channel_close_recv, - bytes_to_channel_recv - ) - .await - }); - }, - ChannelId::Unreliable => todo!(), - } - } - } => { - trace!("Connection Channels listener ended") - } - }; - - // Wait for all the channels to have flushed/finished: - // We drop our sender first because the recv() call otherwise sleeps forever. - // When every sender has gone out of scope, the recv call will return with an error. We ignore the error. - drop(channel_tasks_keepalive); - let _ = channel_tasks_waiter.recv().await; - - connection.close(VarInt::from_u32(0), "closed".as_bytes()); -} - // Receive messages from the async client tasks and update the sync client. fn update_sync_client( mut connection_events: EventWriter<ConnectionEvent>, |