aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-11 19:55:02 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 19:55:02 +0100
commitde0656475591ff668b832aeefb37d08be2235e7f (patch)
tree396597ef2a84c113fea94d7730480377ae7666ba
parent24388024bb8338d41b60b991bc47cc283a73576f (diff)
[channels] Dedicated close channels for Quinnet Channels
-rw-r--r--src/client.rs39
-rw-r--r--src/shared/channel.rs32
2 files changed, 47 insertions, 24 deletions
diff --git a/src/client.rs b/src/client.rs
index 89e594b..095e51e 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -118,6 +118,7 @@ pub(crate) enum InternalSyncMessage {
CreateChannel {
channel_id: ChannelId,
to_server_receiver: mpsc::Receiver<Bytes>,
+ channel_close_receiver: mpsc::Receiver<()>,
},
}
@@ -269,7 +270,7 @@ impl Connection {
}
}
- /// Disconnect from the server on this connection. This does not send any message to the server, and simply closes all the connection's tasks locally.
+ /// Immediately prevents new messages from being sent on the connection and signal the connection to closes all its background tasks. Before trully closing, the connection will wait for all buffered messages in all its opened channels to be properly sent according to their respective channel type.
fn disconnect(&mut self) -> Result<(), QuinnetError> {
match &self.state {
ConnectionState::Disconnected => Ok(()),
@@ -277,7 +278,10 @@ impl Connection {
self.state = ConnectionState::Disconnected;
match self.close_sender.send(()) {
Ok(_) => Ok(()),
- Err(_) => Err(QuinnetError::InternalChannelClosed),
+ Err(_) => {
+ // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
+ Err(QuinnetError::ConnectionAlreadyClosed)
+ }
}
}
}
@@ -329,15 +333,16 @@ impl Connection {
}
}
+ /// Immediately prevents new messages from being sent on the channel and signal the channel to closes all its background tasks.
+ /// Before trully closing, the channel will wait for all buffered messages to be properly sent according to the channel type.
+ /// Can fail if the [ChannelId] is unknown, or if the channel is already closed.
pub fn close_channel(&mut self, channel_id: ChannelId) -> Result<(), QuinnetError> {
match self.channels.remove(&channel_id) {
- Some(mut channel) => {
- // channel.close()?;
- todo!("Close channels tasks");
+ Some(channel) => {
if Some(channel_id) == self.default_channel {
self.default_channel = None;
}
- Ok(())
+ channel.close()
}
None => Err(QuinnetError::UnknownChannel(channel_id)),
}
@@ -346,15 +351,18 @@ impl Connection {
fn create_channel(&mut self, channel_id: ChannelId) -> Result<ChannelId, QuinnetError> {
let (to_server_sender, to_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+ let (channel_close_sender, channel_close_receiver) =
+ mpsc::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
match self
.internal_sender
.try_send(InternalSyncMessage::CreateChannel {
channel_id,
to_server_receiver,
+ channel_close_receiver,
}) {
Ok(_) => {
- let channel = Channel::new(to_server_sender);
+ let channel = Channel::new(to_server_sender, channel_close_sender);
self.channels.insert(channel_id, channel);
if self.default_channel.is_none() {
self.default_channel = Some(channel_id);
@@ -494,15 +502,14 @@ impl Client {
self.default_connection_id
}
- /// Close a specific connection. This will call disconnect on the connection and remove it from the client. This may fail if the [Connection] fails to disconnect or if no [Connection] if found for connection_id
+ /// Close a specific connection. Removes it from the client. This may fail if no [Connection] if found for connection_id, or if the [Connection] is already closed.
pub fn close_connection(&mut self, connection_id: ConnectionId) -> Result<(), QuinnetError> {
match self.connections.remove(&connection_id) {
Some(mut connection) => {
- connection.disconnect()?;
if Some(connection_id) == self.default_connection_id {
self.default_connection_id = None;
}
- Ok(())
+ connection.disconnect()
}
None => Err(QuinnetError::UnknownConnection(connection_id)),
}
@@ -658,16 +665,16 @@ async fn handle_connection_channels(
mut from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
) {
- let close_receiver_clone = close_receiver.resubscribe();
+ // let close_receiver_clone = close_receiver.resubscribe();
tokio::select! {
_ = close_receiver.recv() => {
trace!("Connection Channels listener received a close signal")
}
_ = async {
while let Some(sync_message) = from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message;
+ let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message;
- let close_receiver = close_receiver_clone.resubscribe();
+ // let close_receiver = close_receiver_clone.resubscribe();
let connection_handle = connection.clone();
let to_sync_client = to_sync_client.clone();
match channel_id {
@@ -677,7 +684,8 @@ async fn handle_connection_channels(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- close_receiver,
+ // close_receiver,
+ channel_close_receiver,
to_server_receiver
)
.await
@@ -689,7 +697,8 @@ async fn handle_connection_channels(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- close_receiver,
+ // close_receiver,
+ channel_close_receiver,
to_server_receiver
)
.await
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 5c458bb..1d15f6f 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -4,10 +4,7 @@ use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::{SendStream, VarInt};
use std::fmt::Debug;
-use tokio::sync::{
- broadcast,
- mpsc::{self, error::TrySendError},
-};
+use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
@@ -35,10 +32,18 @@ impl std::fmt::Display for ChannelId {
#[derive(Debug)]
pub struct Channel {
sender: mpsc::Sender<Bytes>,
+ close_sender: mpsc::Sender<()>,
}
impl Channel {
- pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
+ pub(crate) fn new(sender: mpsc::Sender<Bytes>, close_sender: mpsc::Sender<()>) -> Self {
+ Self {
+ sender,
+ close_sender,
+ }
+ }
+
+ pub(crate) fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
match self.sender.try_send(payload.into()) {
Ok(_) => Ok(()),
Err(err) => match err {
@@ -48,8 +53,14 @@ impl Channel {
}
}
- pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self {
- Self { sender }
+ pub(crate) fn close(&self) -> Result<(), QuinnetError> {
+ match self.close_sender.blocking_send(()) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
+ Err(QuinnetError::ChannelAlreadyClosed)
+ }
+ }
}
}
@@ -57,7 +68,8 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- mut close_receiver: broadcast::Receiver<()>,
+ // mut close_receiver: broadcast::Receiver<()>,
+ mut close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
let mut frame_sender = new_uni_frame_sender(&connection).await;
@@ -99,7 +111,8 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
- mut close_receiver: broadcast::Receiver<()>,
+ // mut close_receiver: broadcast::Receiver<()>,
+ mut close_receiver: mpsc::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
tokio::select! {
@@ -121,6 +134,7 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
trace!("Unordered Reliable Channel task ended")
}
}
+ todo!("Flush and signal finished")
}
async fn new_uni_frame_sender(