aboutsummaryrefslogtreecommitdiff
path: root/src/shared
diff options
context:
space:
mode:
Diffstat (limited to 'src/shared')
-rw-r--r--src/shared/channel.rs21
1 files changed, 15 insertions, 6 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 1d15f6f..96d8afc 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -4,7 +4,10 @@ use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::{SendStream, VarInt};
use std::fmt::Debug;
-use tokio::sync::mpsc::{self, error::TrySendError};
+use tokio::sync::{
+ broadcast,
+ mpsc::{self, error::TrySendError},
+};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
@@ -68,8 +71,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- // mut close_receiver: broadcast::Receiver<()>,
- mut close_receiver: mpsc::Receiver<()>,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut channel_close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
let mut frame_sender = new_uni_frame_sender(&connection).await;
@@ -78,6 +81,9 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
_ = close_receiver.recv() => {
trace!("Ordered Reliable Channel task received a close signal")
}
+ _ = channel_close_receiver.recv() => {
+ trace!("Ordered Reliable Channel task received a channel close signal")
+ }
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
@@ -111,13 +117,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- // mut close_receiver: broadcast::Receiver<()>,
- mut close_receiver: mpsc::Receiver<()>,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut channel_close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
tokio::select! {
_ = close_receiver.recv() => {
- trace!("Unordered Reliable Channel task received a close signal")
+ trace!("Ordered Reliable Channel task received a close signal")
+ }
+ _ = channel_close_receiver.recv() => {
+ trace!("Unordered Reliable Channel task received a channel close signal")
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {