diff options
-rw-r--r-- | src/client.rs | 102 |
1 files changed, 61 insertions, 41 deletions
diff --git a/src/client.rs b/src/client.rs index 6be8c99..e852334 100644 --- a/src/client.rs +++ b/src/client.rs @@ -511,7 +511,7 @@ fn configure_client( } } -async fn connection_task(mut spawn_config: ConnectionSpawnConfig) { +async fn connection_task(spawn_config: ConnectionSpawnConfig) { let config = spawn_config.connection_config; let server_adr_str = format!("{}:{}", config.server_host, config.server_port); let srv_host = config.server_host.clone(); @@ -571,45 +571,13 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) { .await }); - // Channels Sending tasks for this connection - let mut close_receiver = spawn_config.close_sender.subscribe(); - tokio::select! { - _ = close_receiver.recv() => { - trace!("Connection Channels listener received a close signal") - } - _ = async { - while let Some(sync_message) = spawn_config - .from_sync_client.recv().await { - let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message; - - let close_receiver = spawn_config.close_sender.subscribe(); - let connection_handle = connection.clone(); - let to_sync_client = spawn_config.to_sync_client.clone(); - match channel_type { - ChannelType::OrderedReliable => { - tokio::spawn(async move { - ordered_reliable_channel_task( - connection_handle, - to_sync_client, - close_receiver, - to_server_receiver - ) - .await - }); - }, - ChannelType::UnorderedReliable => { - tokio::spawn(async move { - todo!() - }); - }, - ChannelType::Unreliable => todo!(), - ChannelType::SequencedUnreliable => todo!(), - } - } - } => { - trace!("Connection Channels listener ended") - } - }; + handle_connection_channels( + connection, + spawn_config.close_sender, + spawn_config.from_sync_client, + spawn_config.to_sync_client, + ) + .await; } } } @@ -625,7 +593,7 @@ async fn connection_receiving_task( trace!("Listener for new Unidirectional Receiving Streams received a close signal") } _ = async { - while let Ok(recv)= connection.accept_uni().await { + while let Ok(recv) = connection.accept_uni().await { let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new()); let from_server_sender = from_server_sender.clone(); uni_receivers.spawn(async move { @@ -643,6 +611,58 @@ async fn connection_receiving_task( trace!("All unidirectional stream receivers cleaned"); } +async fn handle_connection_channels( + connection: quinn::Connection, + close_sender: broadcast::Sender<()>, + mut from_sync_client: mpsc::Receiver<InternalSyncMessage>, + to_sync_client: mpsc::Sender<InternalAsyncMessage>, +) { + let mut close_receiver = close_sender.subscribe(); + tokio::select! { + _ = close_receiver.recv() => { + trace!("Connection Channels listener received a close signal") + } + _ = async { + while let Some(sync_message) = from_sync_client.recv().await { + + let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message; + + let close_receiver = close_sender.subscribe(); + let connection_handle = connection.clone(); + let to_sync_client = to_sync_client.clone(); + match channel_type { + ChannelType::OrderedReliable => { + tokio::spawn(async move { + ordered_reliable_channel_task( + connection_handle, + to_sync_client, + close_receiver, + to_server_receiver + ) + .await + }); + }, + ChannelType::UnorderedReliable => { + tokio::spawn(async move { + // unordered_reliable_channel_task( + // connection_handle, + // to_sync_client, + // close_receiver, + // to_server_receiver + // ) + // .await + }); + }, + ChannelType::Unreliable => todo!(), + ChannelType::SequencedUnreliable => todo!(), + } + } + } => { + trace!("Connection Channels listener ended") + } + }; +} + async fn ordered_reliable_channel_task( connection: quinn::Connection, to_sync_client: mpsc::Sender<InternalAsyncMessage>, |