aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-11 19:58:49 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 19:58:49 +0100
commit3d585a6fc850d77afc99385685d1f30626c0b6d0 (patch)
tree6fcd644240cb88e6772be177d58ae98764d3994b /src
parentde0656475591ff668b832aeefb37d08be2235e7f (diff)
[channels] Still connect the Channels tasks to the Connection close signal
Diffstat (limited to 'src')
-rw-r--r--src/client.rs16
-rw-r--r--src/shared/channel.rs21
2 files changed, 23 insertions, 14 deletions
diff --git a/src/client.rs b/src/client.rs
index 095e51e..c3d2dfb 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -665,26 +665,26 @@ async fn handle_connection_channels(
mut from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
) {
- // let close_receiver_clone = close_receiver.resubscribe();
+ let close_receiver_clone = close_receiver.resubscribe();
tokio::select! {
_ = close_receiver.recv() => {
trace!("Connection Channels listener received a close signal")
}
_ = async {
while let Some(sync_message) = from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message;
+ let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message;
- // let close_receiver = close_receiver_clone.resubscribe();
- let connection_handle = connection.clone();
- let to_sync_client = to_sync_client.clone();
- match channel_id {
+ let close_receiver = close_receiver_clone.resubscribe();
+ let connection_handle = connection.clone();
+ let to_sync_client = to_sync_client.clone();
+ match channel_id {
ChannelId::OrderedReliable(_) => {
tokio::spawn(async move {
ordered_reliable_channel_task(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- // close_receiver,
+ close_receiver,
channel_close_receiver,
to_server_receiver
)
@@ -697,7 +697,7 @@ async fn handle_connection_channels(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- // close_receiver,
+ close_receiver,
channel_close_receiver,
to_server_receiver
)
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 1d15f6f..96d8afc 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -4,7 +4,10 @@ use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::{SendStream, VarInt};
use std::fmt::Debug;
-use tokio::sync::mpsc::{self, error::TrySendError};
+use tokio::sync::{
+ broadcast,
+ mpsc::{self, error::TrySendError},
+};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
@@ -68,8 +71,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- // mut close_receiver: broadcast::Receiver<()>,
- mut close_receiver: mpsc::Receiver<()>,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut channel_close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
let mut frame_sender = new_uni_frame_sender(&connection).await;
@@ -78,6 +81,9 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
_ = close_receiver.recv() => {
trace!("Ordered Reliable Channel task received a close signal")
}
+ _ = channel_close_receiver.recv() => {
+ trace!("Ordered Reliable Channel task received a channel close signal")
+ }
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
@@ -111,13 +117,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- // mut close_receiver: broadcast::Receiver<()>,
- mut close_receiver: mpsc::Receiver<()>,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut channel_close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
tokio::select! {
_ = close_receiver.recv() => {
- trace!("Unordered Reliable Channel task received a close signal")
+ trace!("Ordered Reliable Channel task received a close signal")
+ }
+ _ = channel_close_receiver.recv() => {
+ trace!("Unordered Reliable Channel task received a channel close signal")
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {