diff options
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r-- | src/shared/channel.rs | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 5c458bb..1d15f6f 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -4,10 +4,7 @@ use bytes::Bytes; use futures::sink::SinkExt; use quinn::{SendStream, VarInt}; use std::fmt::Debug; -use tokio::sync::{ - broadcast, - mpsc::{self, error::TrySendError}, -}; +use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; pub(crate) type MultiChannelId = u64; @@ -35,10 +32,18 @@ impl std::fmt::Display for ChannelId { #[derive(Debug)] pub struct Channel { sender: mpsc::Sender<Bytes>, + close_sender: mpsc::Sender<()>, } impl Channel { - pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { + pub(crate) fn new(sender: mpsc::Sender<Bytes>, close_sender: mpsc::Sender<()>) -> Self { + Self { + sender, + close_sender, + } + } + + pub(crate) fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { match self.sender.try_send(payload.into()) { Ok(_) => Ok(()), Err(err) => match err { @@ -48,8 +53,14 @@ impl Channel { } } - pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self { - Self { sender } + pub(crate) fn close(&self) -> Result<(), QuinnetError> { + match self.close_sender.blocking_send(()) { + Ok(_) => Ok(()), + Err(_) => { + // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated. + Err(QuinnetError::ChannelAlreadyClosed) + } + } } } @@ -57,7 +68,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - mut close_receiver: broadcast::Receiver<()>, + // mut close_receiver: broadcast::Receiver<()>, + mut close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { let mut frame_sender = new_uni_frame_sender(&connection).await; @@ -99,7 +111,8 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - mut close_receiver: broadcast::Receiver<()>, + // mut close_receiver: broadcast::Receiver<()>, + mut close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { tokio::select! { @@ -121,6 +134,7 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( trace!("Unordered Reliable Channel task ended") } } + todo!("Flush and signal finished") } async fn new_uni_frame_sender( |