aboutsummaryrefslogtreecommitdiff
path: root/src/shared/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r--src/shared/channel.rs32
1 files changed, 23 insertions, 9 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 5c458bb..1d15f6f 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -4,10 +4,7 @@ use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::{SendStream, VarInt};
use std::fmt::Debug;
-use tokio::sync::{
- broadcast,
- mpsc::{self, error::TrySendError},
-};
+use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
@@ -35,10 +32,18 @@ impl std::fmt::Display for ChannelId {
#[derive(Debug)]
pub struct Channel {
sender: mpsc::Sender<Bytes>,
+ close_sender: mpsc::Sender<()>,
}
impl Channel {
- pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
+ pub(crate) fn new(sender: mpsc::Sender<Bytes>, close_sender: mpsc::Sender<()>) -> Self {
+ Self {
+ sender,
+ close_sender,
+ }
+ }
+
+ pub(crate) fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
match self.sender.try_send(payload.into()) {
Ok(_) => Ok(()),
Err(err) => match err {
@@ -48,8 +53,14 @@ impl Channel {
}
}
- pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self {
- Self { sender }
+ pub(crate) fn close(&self) -> Result<(), QuinnetError> {
+ match self.close_sender.blocking_send(()) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
+ Err(QuinnetError::ChannelAlreadyClosed)
+ }
+ }
}
}
@@ -57,7 +68,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- mut close_receiver: broadcast::Receiver<()>,
+ // mut close_receiver: broadcast::Receiver<()>,
+ mut close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
let mut frame_sender = new_uni_frame_sender(&connection).await;
@@ -99,7 +111,8 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- mut close_receiver: broadcast::Receiver<()>,
+ // mut close_receiver: broadcast::Receiver<()>,
+ mut close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
tokio::select! {
@@ -121,6 +134,7 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
trace!("Unordered Reliable Channel task ended")
}
}
+ todo!("Flush and signal finished")
}
async fn new_uni_frame_sender(