aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs39
1 files changed, 24 insertions, 15 deletions
diff --git a/src/client.rs b/src/client.rs
index 89e594b..095e51e 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -118,6 +118,7 @@ pub(crate) enum InternalSyncMessage {
CreateChannel {
channel_id: ChannelId,
to_server_receiver: mpsc::Receiver<Bytes>,
+ channel_close_receiver: mpsc::Receiver<()>,
},
}
@@ -269,7 +270,7 @@ impl Connection {
}
}
- /// Disconnect from the server on this connection. This does not send any message to the server, and simply closes all the connection's tasks locally.
+ /// Immediately prevents new messages from being sent on the connection and signal the connection to closes all its background tasks. Before trully closing, the connection will wait for all buffered messages in all its opened channels to be properly sent according to their respective channel type.
fn disconnect(&mut self) -> Result<(), QuinnetError> {
match &self.state {
ConnectionState::Disconnected => Ok(()),
@@ -277,7 +278,10 @@ impl Connection {
self.state = ConnectionState::Disconnected;
match self.close_sender.send(()) {
Ok(_) => Ok(()),
- Err(_) => Err(QuinnetError::InternalChannelClosed),
+ Err(_) => {
+ // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
+ Err(QuinnetError::ConnectionAlreadyClosed)
+ }
}
}
}
@@ -329,15 +333,16 @@ impl Connection {
}
}
+ /// Immediately prevents new messages from being sent on the channel and signal the channel to closes all its background tasks.
+ /// Before trully closing, the channel will wait for all buffered messages to be properly sent according to the channel type.
+ /// Can fail if the [ChannelId] is unknown, or if the channel is already closed.
pub fn close_channel(&mut self, channel_id: ChannelId) -> Result<(), QuinnetError> {
match self.channels.remove(&channel_id) {
- Some(mut channel) => {
- // channel.close()?;
- todo!("Close channels tasks");
+ Some(channel) => {
if Some(channel_id) == self.default_channel {
self.default_channel = None;
}
- Ok(())
+ channel.close()
}
None => Err(QuinnetError::UnknownChannel(channel_id)),
}
@@ -346,15 +351,18 @@ impl Connection {
fn create_channel(&mut self, channel_id: ChannelId) -> Result<ChannelId, QuinnetError> {
let (to_server_sender, to_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+ let (channel_close_sender, channel_close_receiver) =
+ mpsc::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
match self
.internal_sender
.try_send(InternalSyncMessage::CreateChannel {
channel_id,
to_server_receiver,
+ channel_close_receiver,
}) {
Ok(_) => {
- let channel = Channel::new(to_server_sender);
+ let channel = Channel::new(to_server_sender, channel_close_sender);
self.channels.insert(channel_id, channel);
if self.default_channel.is_none() {
self.default_channel = Some(channel_id);
@@ -494,15 +502,14 @@ impl Client {
self.default_connection_id
}
- /// Close a specific connection. This will call disconnect on the connection and remove it from the client. This may fail if the [Connection] fails to disconnect or if no [Connection] if found for connection_id
+ /// Close a specific connection. Removes it from the client. This may fail if no [Connection] if found for connection_id, or if the [Connection] is already closed.
pub fn close_connection(&mut self, connection_id: ConnectionId) -> Result<(), QuinnetError> {
match self.connections.remove(&connection_id) {
Some(mut connection) => {
- connection.disconnect()?;
if Some(connection_id) == self.default_connection_id {
self.default_connection_id = None;
}
- Ok(())
+ connection.disconnect()
}
None => Err(QuinnetError::UnknownConnection(connection_id)),
}
@@ -658,16 +665,16 @@ async fn handle_connection_channels(
mut from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
) {
- let close_receiver_clone = close_receiver.resubscribe();
+ // let close_receiver_clone = close_receiver.resubscribe();
tokio::select! {
_ = close_receiver.recv() => {
trace!("Connection Channels listener received a close signal")
}
_ = async {
while let Some(sync_message) = from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message;
+ let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver, channel_close_receiver } = sync_message;
- let close_receiver = close_receiver_clone.resubscribe();
+ // let close_receiver = close_receiver_clone.resubscribe();
let connection_handle = connection.clone();
let to_sync_client = to_sync_client.clone();
match channel_id {
@@ -677,7 +684,8 @@ async fn handle_connection_channels(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- close_receiver,
+ // close_receiver,
+ channel_close_receiver,
to_server_receiver
)
.await
@@ -689,7 +697,8 @@ async fn handle_connection_channels(
connection_handle,
to_sync_client,
|| InternalAsyncMessage::LostConnection,
- close_receiver,
+ // close_receiver,
+ channel_close_receiver,
to_server_receiver
)
.await