aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-04 17:17:10 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 14:31:44 +0100
commit0308782c5775baf0908b73e3e1a6b17c0a6a00a0 (patch)
treea0b7893f8653eb486e836cad72c61dcf5330e626
parentae6c55fc44145b2fe799d461ba22973785dec889 (diff)
[client] Refactor connection_task with handle_connection_channels
-rw-r--r--src/client.rs102
1 files changed, 61 insertions, 41 deletions
diff --git a/src/client.rs b/src/client.rs
index 6be8c99..e852334 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -511,7 +511,7 @@ fn configure_client(
}
}
-async fn connection_task(mut spawn_config: ConnectionSpawnConfig) {
+async fn connection_task(spawn_config: ConnectionSpawnConfig) {
let config = spawn_config.connection_config;
let server_adr_str = format!("{}:{}", config.server_host, config.server_port);
let srv_host = config.server_host.clone();
@@ -571,45 +571,13 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) {
.await
});
- // Channels Sending tasks for this connection
- let mut close_receiver = spawn_config.close_sender.subscribe();
- tokio::select! {
- _ = close_receiver.recv() => {
- trace!("Connection Channels listener received a close signal")
- }
- _ = async {
- while let Some(sync_message) = spawn_config
- .from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message;
-
- let close_receiver = spawn_config.close_sender.subscribe();
- let connection_handle = connection.clone();
- let to_sync_client = spawn_config.to_sync_client.clone();
- match channel_type {
- ChannelType::OrderedReliable => {
- tokio::spawn(async move {
- ordered_reliable_channel_task(
- connection_handle,
- to_sync_client,
- close_receiver,
- to_server_receiver
- )
- .await
- });
- },
- ChannelType::UnorderedReliable => {
- tokio::spawn(async move {
- todo!()
- });
- },
- ChannelType::Unreliable => todo!(),
- ChannelType::SequencedUnreliable => todo!(),
- }
- }
- } => {
- trace!("Connection Channels listener ended")
- }
- };
+ handle_connection_channels(
+ connection,
+ spawn_config.close_sender,
+ spawn_config.from_sync_client,
+ spawn_config.to_sync_client,
+ )
+ .await;
}
}
}
@@ -625,7 +593,7 @@ async fn connection_receiving_task(
trace!("Listener for new Unidirectional Receiving Streams received a close signal")
}
_ = async {
- while let Ok(recv)= connection.accept_uni().await {
+ while let Ok(recv) = connection.accept_uni().await {
let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
let from_server_sender = from_server_sender.clone();
uni_receivers.spawn(async move {
@@ -643,6 +611,58 @@ async fn connection_receiving_task(
trace!("All unidirectional stream receivers cleaned");
}
+async fn handle_connection_channels(
+ connection: quinn::Connection,
+ close_sender: broadcast::Sender<()>,
+ mut from_sync_client: mpsc::Receiver<InternalSyncMessage>,
+ to_sync_client: mpsc::Sender<InternalAsyncMessage>,
+) {
+ let mut close_receiver = close_sender.subscribe();
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Connection Channels listener received a close signal")
+ }
+ _ = async {
+ while let Some(sync_message) = from_sync_client.recv().await {
+
+ let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message;
+
+ let close_receiver = close_sender.subscribe();
+ let connection_handle = connection.clone();
+ let to_sync_client = to_sync_client.clone();
+ match channel_type {
+ ChannelType::OrderedReliable => {
+ tokio::spawn(async move {
+ ordered_reliable_channel_task(
+ connection_handle,
+ to_sync_client,
+ close_receiver,
+ to_server_receiver
+ )
+ .await
+ });
+ },
+ ChannelType::UnorderedReliable => {
+ tokio::spawn(async move {
+ // unordered_reliable_channel_task(
+ // connection_handle,
+ // to_sync_client,
+ // close_receiver,
+ // to_server_receiver
+ // )
+ // .await
+ });
+ },
+ ChannelType::Unreliable => todo!(),
+ ChannelType::SequencedUnreliable => todo!(),
+ }
+ }
+ } => {
+ trace!("Connection Channels listener ended")
+ }
+ };
+}
+
async fn ordered_reliable_channel_task(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,