aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-14 16:04:59 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-14 16:04:59 +0100
commitf11a0360a273ba7af83e92db332c5b9be2854d29 (patch)
treebdfd1a457830b6d1c4251ad2879d35bb756ccf88
parent4e64c1812b90d6cce07abb202439154fe9aa31a3 (diff)
[client] Use the shared channels_task
-rw-r--r--src/client.rs110
1 files changed, 25 insertions, 85 deletions
diff --git a/src/client.rs b/src/client.rs
index 5019630..1b3bd15 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -11,7 +11,7 @@ use std::{
use bevy::prelude::*;
use bytes::Bytes;
use futures_util::StreamExt;
-use quinn::{ClientConfig, Connection as QuinnConnection, ConnectionError, Endpoint, VarInt};
+use quinn::{ClientConfig, Connection as QuinnConnection, ConnectionError, Endpoint};
use quinn_proto::ConnectionStats;
use serde::Deserialize;
use tokio::{
@@ -30,8 +30,8 @@ use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use crate::shared::{
channel::{
- get_channel_id_from_type, ordered_reliable_channel_task, unordered_reliable_channel_task,
- Channel, ChannelAsyncMessage, ChannelId, ChannelSyncMessage, ChannelType, MultiChannelId,
+ channels_task, get_channel_id_from_type, Channel, ChannelAsyncMessage, ChannelId,
+ ChannelSyncMessage, ChannelType, MultiChannelId,
},
AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
@@ -581,7 +581,8 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
.await
.expect("Failed to signal connection to sync client");
- let _close_waiter = {
+ // Spawn a task to listen for the underlying connection being closed
+ {
let conn = connection.clone();
let to_sync_client = spawn_config.to_sync_client_send.clone();
tokio::spawn(async move {
@@ -594,24 +595,30 @@ async fn connection_task(spawn_config: ConnectionSpawnConfig) {
})
};
- let close_recv = spawn_config.close_recv.resubscribe();
- let connection_handle = connection.clone();
+ // Spawn a task to listen for streams opened by the server
+ {
+ let close_recv = spawn_config.close_recv.resubscribe();
+ let connection_handle = connection.clone();
+ tokio::spawn(async move {
+ connection_receiving_task(
+ connection_handle,
+ spawn_config.bytes_from_server_send,
+ close_recv,
+ )
+ .await
+ });
+ }
+
+ // Spawn a task to handle channels for this connection
tokio::spawn(async move {
- connection_receiving_task(
- connection_handle,
- spawn_config.bytes_from_server_send,
- close_recv,
+ channels_task(
+ connection,
+ spawn_config.close_recv,
+ spawn_config.to_channels_recv,
+ spawn_config.from_channels_send,
)
.await
});
-
- handle_connection_channels(
- connection,
- spawn_config.close_recv,
- spawn_config.to_channels_recv,
- spawn_config.from_channels_send,
- )
- .await;
}
}
}
@@ -645,73 +652,6 @@ async fn connection_receiving_task(
trace!("All unidirectional stream receivers cleaned");
}
-async fn handle_connection_channels(
- connection: quinn::Connection,
- mut close_recv: broadcast::Receiver<()>,
- mut to_channels_recv: mpsc::Receiver<ChannelSyncMessage>,
- from_channels_send: mpsc::Sender<ChannelAsyncMessage>,
-) {
- // Use an mpsc channel where, instead of sending messages, we wait for the channel to be closed, which happens when every sender has been dropped. We can't use a JoinSet as simply here since we would also need to drain closed channels from it.
- let (channel_tasks_keepalive, mut channel_tasks_waiter) = mpsc::channel(1);
-
- let close_receiver_clone = close_recv.resubscribe();
- tokio::select! {
- _ = close_recv.recv() => {
- trace!("Connection Channels listener received a close signal")
- }
- _ = async {
- while let Some(sync_message) = to_channels_recv.recv().await {
- let ChannelSyncMessage::CreateChannel{ channel_id, bytes_to_channel_recv, channel_close_recv } = sync_message;
-
- let close_receiver = close_receiver_clone.resubscribe();
- let connection_handle = connection.clone();
- let from_channels_send = from_channels_send.clone();
- let channels_keepalive_clone = channel_tasks_keepalive.clone();
-
- match channel_id {
- ChannelId::OrderedReliable(_) => {
- tokio::spawn(async move {
- ordered_reliable_channel_task(
- connection_handle,
- channels_keepalive_clone,
- from_channels_send,
- close_receiver,
- channel_close_recv,
- bytes_to_channel_recv
- )
- .await
- });
- },
- ChannelId::UnorderedReliable => {
- tokio::spawn(async move {
- unordered_reliable_channel_task(
- connection_handle,
- channels_keepalive_clone,
- from_channels_send,
- close_receiver,
- channel_close_recv,
- bytes_to_channel_recv
- )
- .await
- });
- },
- ChannelId::Unreliable => todo!(),
- }
- }
- } => {
- trace!("Connection Channels listener ended")
- }
- };
-
- // Wait for all the channels to have flushed/finished:
- // We drop our sender first because the recv() call otherwise sleeps forever.
- // When every sender has gone out of scope, the recv call will return with an error. We ignore the error.
- drop(channel_tasks_keepalive);
- let _ = channel_tasks_waiter.recv().await;
-
- connection.close(VarInt::from_u32(0), "closed".as_bytes());
-}
-
// Receive messages from the async client tasks and update the sync client.
fn update_sync_client(
mut connection_events: EventWriter<ConnectionEvent>,