aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-11 15:31:16 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 15:31:16 +0100
commit09890a11daeb3810dca5b3a30edede8a65a73257 (patch)
treec3347287d9bc3fc039ababe85bff64247c47f40f
parent583b000b32fd0880bb55db774c8ae96fc76781fd (diff)
[client] Call connection.disconnect() from update_sync_client and do not use close_sender from async tasks
-rw-r--r--src/client.rs31
-rw-r--r--src/shared/channel.rs35
2 files changed, 31 insertions, 35 deletions
diff --git a/src/client.rs b/src/client.rs
index 948d13c..71aad51 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -11,8 +11,8 @@ use std::{
use bevy::prelude::*;
use bytes::Bytes;
use futures_util::StreamExt;
-use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, SendStream};
-use quinn_proto::{ConnectionStats, VarInt};
+use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint};
+use quinn_proto::ConnectionStats;
use serde::Deserialize;
use tokio::{
runtime::{self},
@@ -143,7 +143,7 @@ pub(crate) struct ConnectionSpawnConfig {
cert_mode: CertificateVerificationMode,
from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
- close_sender: broadcast::Sender<()>,
+ close_receiver: broadcast::Receiver<()>,
from_server_sender: mpsc::Sender<Bytes>,
}
@@ -283,6 +283,13 @@ impl Connection {
}
}
+ fn try_disconnect(&mut self) {
+ match &self.disconnect() {
+ Ok(_) => (),
+ Err(err) => error!("Failed to properly close clonnection: {}", err),
+ }
+ }
+
pub fn is_connected(&self) -> bool {
match self.state {
ConnectionState::Connected(_) => true,
@@ -437,7 +444,7 @@ impl Client {
mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
// Create a close channel for this connection
- let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
+ let (close_sender, close_receiver) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
let mut connection = Connection {
state: ConnectionState::Connecting,
@@ -461,7 +468,7 @@ impl Client {
cert_mode,
from_sync_client,
to_sync_client,
- close_sender,
+ close_receiver,
from_server_sender,
})
.await
@@ -584,11 +591,9 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
let _close_waiter = {
let conn = connection.clone();
let to_sync_client = spawn_config.to_sync_client.clone();
- let close_sender = spawn_config.close_sender.clone();
tokio::spawn(async move {
let conn_err = conn.closed().await;
info!("Disconnected: {}", conn_err);
- close_sender.send(()).ok();
to_sync_client
.send(InternalAsyncMessage::LostConnection)
.await
@@ -596,7 +601,7 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
})
};
- let close_receiver = spawn_config.close_sender.subscribe();
+ let close_receiver = spawn_config.close_receiver.resubscribe();
let connection_handle = connection.clone();
tokio::spawn(async move {
connection_receiving_task(
@@ -609,7 +614,7 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
handle_connection_channels(
connection,
- spawn_config.close_sender,
+ spawn_config.close_receiver,
spawn_config.from_sync_client,
spawn_config.to_sync_client,
)
@@ -649,11 +654,11 @@ async fn connection_receiving_task(
async fn handle_connection_channels(
connection: quinn::Connection,
- close_sender: broadcast::Sender<()>,
+ mut close_receiver: broadcast::Receiver<()>,
mut from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
) {
- let mut close_receiver = close_sender.subscribe();
+ let close_receiver_clone = close_receiver.resubscribe();
tokio::select! {
_ = close_receiver.recv() => {
trace!("Connection Channels listener received a close signal")
@@ -662,7 +667,7 @@ async fn handle_connection_channels(
while let Some(sync_message) = from_sync_client.recv().await {
let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message;
- let close_receiver = close_sender.subscribe();
+ let close_receiver = close_receiver_clone.resubscribe();
let connection_handle = connection.clone();
let to_sync_client = to_sync_client.clone();
match channel_id {
@@ -718,7 +723,7 @@ fn update_sync_client(
InternalAsyncMessage::LostConnection => match connection.state {
ConnectionState::Disconnected => (),
_ => {
- connection.state = ConnectionState::Disconnected;
+ connection.try_disconnect();
connection_lost_events.send(ConnectionLostEvent { id: *connection_id });
}
},
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 2091a9a..54ad3d7 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -2,7 +2,7 @@ use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use quinn::VarInt;
+use quinn::{SendStream, VarInt};
use std::fmt::Debug;
use tokio::sync::{
broadcast,
@@ -60,11 +60,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
mut close_receiver: broadcast::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+ let mut frame_sender = new_uni_frame_sender(&connection).await;
tokio::select! {
_ = close_receiver.recv() => {
@@ -73,12 +69,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
- // TODO Clean: error handling
error!("Error while sending, {}", err);
- // error!("Client seems disconnected, closing resources");
- // if let Err(_) = close_sender_clone.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
to_sync_client.send(
on_lost_connection())
.await
@@ -117,19 +108,9 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
-
+ let mut frame_sender = new_uni_frame_sender(&connection).await;
if let Err(err) = frame_sender.send(msg_bytes).await {
- // TODO Clean: error handling
error!("Error while sending, {}", err);
- // error!("Client seems disconnected, closing resources");
- // if let Err(_) = close_sender_clone.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
to_sync_client.send(
on_lost_connection())
.await
@@ -142,6 +123,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
}
}
+async fn new_uni_frame_sender(
+ connection: &quinn::Connection,
+) -> FramedWrite<SendStream, LengthDelimitedCodec> {
+ let uni_sender = connection
+ .open_uni()
+ .await
+ .expect("Failed to open send stream");
+ FramedWrite::new(uni_sender, LengthDelimitedCodec::new())
+}
+
// async fn send_msg(
// // close_sender: &tokio::sync::broadcast::Sender<()>,
// to_sync_client: &mpsc::Sender<InternalAsyncMessage>,