diff options
-rw-r--r-- | src/client.rs | 16 | ||||
-rw-r--r-- | src/shared/channel.rs | 21 |
2 files changed, 23 insertions, 14 deletions
diff --git a/src/client.rs b/src/client.rs index 095e51e..c3d2dfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -665,26 +665,26 @@ async fn handle_connection_channels( mut from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, ) { - // let close_receiver_clone = close_receiver.resubscribe(); + let close_receiver_clone = close_receiver.resubscribe(); tokio::select! { _ = close_receiver.recv() => { trace!("Connection Channels listener received a close signal") } _ = async { while let Some(sync_message) = from_sync_client.recv().await { - let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message; + let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message; - // let close_receiver = close_receiver_clone.resubscribe(); - let connection_handle = connection.clone(); - let to_sync_client = to_sync_client.clone(); - match channel_id { + let close_receiver = close_receiver_clone.resubscribe(); + let connection_handle = connection.clone(); + let to_sync_client = to_sync_client.clone(); + match channel_id { ChannelId::OrderedReliable(_) => { tokio::spawn(async move { ordered_reliable_channel_task( connection_handle, to_sync_client, || InternalAsyncMessage::LostConnection, - // close_receiver, + close_receiver, channel_close_receiver, to_server_receiver ) @@ -697,7 +697,7 @@ async fn handle_connection_channels( connection_handle, to_sync_client, || InternalAsyncMessage::LostConnection, - // close_receiver, + close_receiver, channel_close_receiver, to_server_receiver ) diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 1d15f6f..96d8afc 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -4,7 +4,10 @@ use bytes::Bytes; use futures::sink::SinkExt; use quinn::{SendStream, VarInt}; use std::fmt::Debug; -use tokio::sync::mpsc::{self, error::TrySendError}; +use tokio::sync::{ + broadcast, + mpsc::{self, error::TrySendError}, +}; use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; pub(crate) type MultiChannelId = u64; @@ -68,8 +71,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - // mut close_receiver: broadcast::Receiver<()>, - mut close_receiver: mpsc::Receiver<()>, + mut close_receiver: broadcast::Receiver<()>, + mut channel_close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { let mut frame_sender = new_uni_frame_sender(&connection).await; @@ -78,6 +81,9 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( _ = close_receiver.recv() => { trace!("Ordered Reliable Channel task received a close signal") } + _ = channel_close_receiver.recv() => { + trace!("Ordered Reliable Channel task received a channel close signal") + } _ = async { while let Some(msg_bytes) = to_server_receiver.recv().await { if let Err(err) = frame_sender.send(msg_bytes).await { @@ -111,13 +117,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - // mut close_receiver: broadcast::Receiver<()>, - mut close_receiver: mpsc::Receiver<()>, + mut close_receiver: broadcast::Receiver<()>, + mut channel_close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { tokio::select! { _ = close_receiver.recv() => { - trace!("Unordered Reliable Channel task received a close signal") + trace!("Ordered Reliable Channel task received a close signal") + } + _ = channel_close_receiver.recv() => { + trace!("Unordered Reliable Channel task received a channel close signal") } _ = async { while let Some(msg_bytes) = to_server_receiver.recv().await { |