diff options
author | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 15:31:16 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 15:31:16 +0100 |
commit | 09890a11daeb3810dca5b3a30edede8a65a73257 (patch) | |
tree | c3347287d9bc3fc039ababe85bff64247c47f40f /src/client.rs | |
parent | 583b000b32fd0880bb55db774c8ae96fc76781fd (diff) |
[client] Call connection.disconnect() from update_sync_client and do not use close_sender from async tasks
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/src/client.rs b/src/client.rs index 948d13c..71aad51 100644 --- a/src/client.rs +++ b/src/client.rs @@ -11,8 +11,8 @@ use std::{ use bevy::prelude::*; use bytes::Bytes; use futures_util::StreamExt; -use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, SendStream}; -use quinn_proto::{ConnectionStats, VarInt}; +use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint}; +use quinn_proto::ConnectionStats; use serde::Deserialize; use tokio::{ runtime::{self}, @@ -143,7 +143,7 @@ pub(crate) struct ConnectionSpawnConfig { cert_mode: CertificateVerificationMode, from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, - close_sender: broadcast::Sender<()>, + close_receiver: broadcast::Receiver<()>, from_server_sender: mpsc::Sender<Bytes>, } @@ -283,6 +283,13 @@ impl Connection { } } + fn try_disconnect(&mut self) { + match &self.disconnect() { + Ok(_) => (), + Err(err) => error!("Failed to properly close clonnection: {}", err), + } + } + pub fn is_connected(&self) -> bool { match self.state { ConnectionState::Connected(_) => true, @@ -437,7 +444,7 @@ impl Client { mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE); // Create a close channel for this connection - let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); + let (close_sender, close_receiver) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); let mut connection = Connection { state: ConnectionState::Connecting, @@ -461,7 +468,7 @@ impl Client { cert_mode, from_sync_client, to_sync_client, - close_sender, + close_receiver, from_server_sender, }) .await @@ -584,11 +591,9 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { let _close_waiter = { let conn = connection.clone(); let to_sync_client = spawn_config.to_sync_client.clone(); - let close_sender = spawn_config.close_sender.clone(); tokio::spawn(async move { let conn_err = conn.closed().await; info!("Disconnected: {}", conn_err); - close_sender.send(()).ok(); to_sync_client .send(InternalAsyncMessage::LostConnection) .await @@ -596,7 +601,7 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { }) }; - let close_receiver = spawn_config.close_sender.subscribe(); + let close_receiver = spawn_config.close_receiver.resubscribe(); let connection_handle = connection.clone(); tokio::spawn(async move { connection_receiving_task( @@ -609,7 +614,7 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) { handle_connection_channels( connection, - spawn_config.close_sender, + spawn_config.close_receiver, spawn_config.from_sync_client, spawn_config.to_sync_client, ) @@ -649,11 +654,11 @@ async fn connection_receiving_task( async fn handle_connection_channels( connection: quinn::Connection, - close_sender: broadcast::Sender<()>, + mut close_receiver: broadcast::Receiver<()>, mut from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, ) { - let mut close_receiver = close_sender.subscribe(); + let close_receiver_clone = close_receiver.resubscribe(); tokio::select! { _ = close_receiver.recv() => { trace!("Connection Channels listener received a close signal") @@ -662,7 +667,7 @@ async fn handle_connection_channels( while let Some(sync_message) = from_sync_client.recv().await { let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message; - let close_receiver = close_sender.subscribe(); + let close_receiver = close_receiver_clone.resubscribe(); let connection_handle = connection.clone(); let to_sync_client = to_sync_client.clone(); match channel_id { @@ -718,7 +723,7 @@ fn update_sync_client( InternalAsyncMessage::LostConnection => match connection.state { ConnectionState::Disconnected => (), _ => { - connection.state = ConnectionState::Disconnected; + connection.try_disconnect(); connection_lost_events.send(ConnectionLostEvent { id: *connection_id }); } }, |