diff options
author | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 19:55:02 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 19:55:02 +0100 |
commit | de0656475591ff668b832aeefb37d08be2235e7f (patch) | |
tree | 396597ef2a84c113fea94d7730480377ae7666ba | |
parent | 24388024bb8338d41b60b991bc47cc283a73576f (diff) |
[channels] Dedicated close channels for Quinnet Channels
-rw-r--r-- | src/client.rs | 39 | ||||
-rw-r--r-- | src/shared/channel.rs | 32 |
2 files changed, 47 insertions, 24 deletions
diff --git a/src/client.rs b/src/client.rs index 89e594b..095e51e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -118,6 +118,7 @@ pub(crate) enum InternalSyncMessage { CreateChannel { channel_id: ChannelId, to_server_receiver: mpsc::Receiver<Bytes>, + channel_close_receiver: mpsc::Receiver<()>, }, } @@ -269,7 +270,7 @@ impl Connection { } } - /// Disconnect from the server on this connection. This does not send any message to the server, and simply closes all the connection's tasks locally. + /// Immediately prevents new messages from being sent on the connection and signal the connection to closes all its background tasks. Before trully closing, the connection will wait for all buffered messages in all its opened channels to be properly sent according to their respective channel type. fn disconnect(&mut self) -> Result<(), QuinnetError> { match &self.state { ConnectionState::Disconnected => Ok(()), @@ -277,7 +278,10 @@ impl Connection { self.state = ConnectionState::Disconnected; match self.close_sender.send(()) { Ok(_) => Ok(()), - Err(_) => Err(QuinnetError::InternalChannelClosed), + Err(_) => { + // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated. + Err(QuinnetError::ConnectionAlreadyClosed) + } } } } @@ -329,15 +333,16 @@ impl Connection { } } + /// Immediately prevents new messages from being sent on the channel and signal the channel to closes all its background tasks. + /// Before trully closing, the channel will wait for all buffered messages to be properly sent according to the channel type. + /// Can fail if the [ChannelId] is unknown, or if the channel is already closed. pub fn close_channel(&mut self, channel_id: ChannelId) -> Result<(), QuinnetError> { match self.channels.remove(&channel_id) { - Some(mut channel) => { - // channel.close()?; - todo!("Close channels tasks"); + Some(channel) => { if Some(channel_id) == self.default_channel { self.default_channel = None; } - Ok(()) + channel.close() } None => Err(QuinnetError::UnknownChannel(channel_id)), } @@ -346,15 +351,18 @@ impl Connection { fn create_channel(&mut self, channel_id: ChannelId) -> Result<ChannelId, QuinnetError> { let (to_server_sender, to_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); + let (channel_close_sender, channel_close_receiver) = + mpsc::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); match self .internal_sender .try_send(InternalSyncMessage::CreateChannel { channel_id, to_server_receiver, + channel_close_receiver, }) { Ok(_) => { - let channel = Channel::new(to_server_sender); + let channel = Channel::new(to_server_sender, channel_close_sender); self.channels.insert(channel_id, channel); if self.default_channel.is_none() { self.default_channel = Some(channel_id); @@ -494,15 +502,14 @@ impl Client { self.default_connection_id } - /// Close a specific connection. This will call disconnect on the connection and remove it from the client. This may fail if the [Connection] fails to disconnect or if no [Connection] if found for connection_id + /// Close a specific connection. Removes it from the client. This may fail if no [Connection] if found for connection_id, or if the [Connection] is already closed. pub fn close_connection(&mut self, connection_id: ConnectionId) -> Result<(), QuinnetError> { match self.connections.remove(&connection_id) { Some(mut connection) => { - connection.disconnect()?; if Some(connection_id) == self.default_connection_id { self.default_connection_id = None; } - Ok(()) + connection.disconnect() } None => Err(QuinnetError::UnknownConnection(connection_id)), } @@ -658,16 +665,16 @@ async fn handle_connection_channels( mut from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, ) { - let close_receiver_clone = close_receiver.resubscribe(); + // let close_receiver_clone = close_receiver.resubscribe(); tokio::select! { _ = close_receiver.recv() => { trace!("Connection Channels listener received a close signal") } _ = async { while let Some(sync_message) = from_sync_client.recv().await { - let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message; + let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message; - let close_receiver = close_receiver_clone.resubscribe(); + // let close_receiver = close_receiver_clone.resubscribe(); let connection_handle = connection.clone(); let to_sync_client = to_sync_client.clone(); match channel_id { @@ -677,7 +684,8 @@ async fn handle_connection_channels( connection_handle, to_sync_client, || InternalAsyncMessage::LostConnection, - close_receiver, + // close_receiver, + channel_close_receiver, to_server_receiver ) .await @@ -689,7 +697,8 @@ async fn handle_connection_channels( connection_handle, to_sync_client, || InternalAsyncMessage::LostConnection, - close_receiver, + // close_receiver, + channel_close_receiver, to_server_receiver ) .await diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 5c458bb..1d15f6f 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -4,10 +4,7 @@ use bytes::Bytes; use futures::sink::SinkExt; use quinn::{SendStream, VarInt}; use std::fmt::Debug; -use tokio::sync::{ - broadcast, - mpsc::{self, error::TrySendError}, -}; +use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; pub(crate) type MultiChannelId = u64; @@ -35,10 +32,18 @@ impl std::fmt::Display for ChannelId { #[derive(Debug)] pub struct Channel { sender: mpsc::Sender<Bytes>, + close_sender: mpsc::Sender<()>, } impl Channel { - pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { + pub(crate) fn new(sender: mpsc::Sender<Bytes>, close_sender: mpsc::Sender<()>) -> Self { + Self { + sender, + close_sender, + } + } + + pub(crate) fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { match self.sender.try_send(payload.into()) { Ok(_) => Ok(()), Err(err) => match err { @@ -48,8 +53,14 @@ impl Channel { } } - pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self { - Self { sender } + pub(crate) fn close(&self) -> Result<(), QuinnetError> { + match self.close_sender.blocking_send(()) { + Ok(_) => Ok(()), + Err(_) => { + // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated. + Err(QuinnetError::ChannelAlreadyClosed) + } + } } } @@ -57,7 +68,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - mut close_receiver: broadcast::Receiver<()>, + // mut close_receiver: broadcast::Receiver<()>, + mut close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { let mut frame_sender = new_uni_frame_sender(&connection).await; @@ -99,7 +111,8 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, - mut close_receiver: broadcast::Receiver<()>, + // mut close_receiver: broadcast::Receiver<()>, + mut close_receiver: mpsc::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { tokio::select! { @@ -121,6 +134,7 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( trace!("Unordered Reliable Channel task ended") } } + todo!("Flush and signal finished") } async fn new_uni_frame_sender( |