aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-09 11:27:17 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 14:52:44 +0100
commit583b000b32fd0880bb55db774c8ae96fc76781fd (patch)
treec701fb24b9675ed28ba33362cc79d9c1b11d6b6f
parent694507eebc38c216a4aadee900f83b02d1599915 (diff)
[channels] Move channel tasks into channel module, shared by client & server
-rw-r--r--src/client.rs83
-rw-r--r--src/shared/channel.rs180
2 files changed, 122 insertions, 141 deletions
diff --git a/src/client.rs b/src/client.rs
index 0c7ab02..948d13c 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -10,7 +10,6 @@ use std::{
use bevy::prelude::*;
use bytes::Bytes;
-use futures::sink::SinkExt;
use futures_util::StreamExt;
use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, SendStream};
use quinn_proto::{ConnectionStats, VarInt};
@@ -27,10 +26,13 @@ use tokio::{
},
task::JoinSet,
};
-use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
+use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use crate::shared::{
- channel::{Channel, ChannelId, ChannelType, OrdRelChannelId},
+ channel::{
+ ordered_reliable_channel_task, unordered_reliable_channel_task, Channel, ChannelId,
+ ChannelType, OrdRelChannelId,
+ },
AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
@@ -669,6 +671,7 @@ async fn handle_connection_channels(
ordered_reliable_channel_task(
connection_handle,
to_sync_client,
+ || InternalAsyncMessage::LostConnection,
close_receiver,
to_server_receiver
)
@@ -677,13 +680,14 @@ async fn handle_connection_channels(
},
ChannelId::UnorderedReliable => {
tokio::spawn(async move {
- // unordered_reliable_channel_task(
- // connection_handle,
- // to_sync_client,
- // close_receiver,
- // to_server_receiver
- // )
- // .await
+ unordered_reliable_channel_task(
+ connection_handle,
+ to_sync_client,
+ || InternalAsyncMessage::LostConnection,
+ close_receiver,
+ to_server_receiver
+ )
+ .await
});
},
ChannelId::Unreliable => todo!(),
@@ -695,65 +699,6 @@ async fn handle_connection_channels(
};
}
-async fn ordered_reliable_channel_task(
- connection: quinn::Connection,
- to_sync_client: mpsc::Sender<InternalAsyncMessage>,
- mut close_receiver: broadcast::Receiver<()>,
- mut to_server_receiver: mpsc::Receiver<Bytes>,
-) {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
- tokio::select! {
- _ = close_receiver.recv() => {
- trace!("Ordered Reliable Send Channel received a close signal")
- }
- _ = async {
- while let Some(msg_bytes) = to_server_receiver.recv().await {
- send_msg(&to_sync_client, &mut frame_sender, msg_bytes).await;
- }
- } => {
- trace!("Ordered Reliable Send Channel ended")
- }
- }
- while let Ok(msg_bytes) = to_server_receiver.try_recv() {
- if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- }
- }
- if let Err(err) = frame_sender.flush().await {
- error!("Error while flushing stream: {}", err);
- }
- if let Err(err) = frame_sender.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
- }
- todo!("Do not close here, wait for all channels to be flushed");
- connection.close(VarInt::from_u32(0), "closed".as_bytes());
-}
-
-async fn send_msg(
- // close_sender: &tokio::sync::broadcast::Sender<()>,
- to_sync_client: &mpsc::Sender<InternalAsyncMessage>,
- frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>,
- msg_bytes: Bytes,
-) {
- if let Err(err) = frame_send.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- error!("Client seems disconnected, closing resources");
- // Emit LostConnection to properly update the connection about its state.
- // Raise LostConnection event before emitting a close signal because we have no guarantee to continue this async execution after the close signal has been processed.
- to_sync_client
- .send(InternalAsyncMessage::LostConnection)
- .await
- .expect("Failed to signal connection lost to sync client");
- // if let Err(_) = close_sender.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
- }
-}
-
// Receive messages from the async client tasks and update the sync client.
fn update_sync_client(
mut connection_events: EventWriter<ConnectionEvent>,
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index fcdaf1a..2091a9a 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -1,14 +1,14 @@
+use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use futures_util::StreamExt;
+use quinn::VarInt;
+use std::fmt::Debug;
use tokio::sync::{
broadcast,
mpsc::{self, error::TrySendError},
};
-use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
-
-use super::QuinnetError;
+use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type OrdRelChannelId = u64;
@@ -53,76 +53,112 @@ impl Channel {
}
}
-// async fn ordered_reliable_channel_task(
-// connection: quinn::Connection,
-// to_sync_client: mpsc::Sender<InternalAsyncMessage>,
-// mut close_receiver: broadcast::Receiver<()>,
-// mut to_server_receiver: mpsc::Receiver<Bytes>,
-// ) {
-// tokio::select! {
-// _ = close_receiver.recv() => {
-// trace!("Ordered Reliable Send Channel received a close signal")
-// }
-// _ = async {
-// let uni_sender = connection
-// .open_uni()
-// .await
-// .expect("Failed to open send stream");
-// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
+ connection: quinn::Connection,
+ to_sync_client: mpsc::Sender<T>,
+ on_lost_connection: fn() -> T,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut to_server_receiver: mpsc::Receiver<Bytes>,
+) {
+ let uni_sender = connection
+ .open_uni()
+ .await
+ .expect("Failed to open send stream");
+ let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
-// while let Some(msg_bytes) = to_server_receiver.recv().await {
-// if let Err(err) = frame_sender.send(msg_bytes).await {
-// // TODO Clean: error handling
-// error!("Error while sending, {}", err);
-// // error!("Client seems disconnected, closing resources");
-// // if let Err(_) = close_sender_clone.send(()) {
-// // error!("Failed to close all client streams & resources")
-// // }
-// to_sync_client.send(
-// InternalAsyncMessage::LostConnection)
-// .await
-// .expect("Failed to signal connection lost to sync client");
-// }
-// }
-// } => {
-// trace!("Ordered Reliable Send Channel ended")
-// }
-// }
-// }
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Ordered Reliable Channel task received a close signal")
+ }
+ _ = async {
+ while let Some(msg_bytes) = to_server_receiver.recv().await {
+ if let Err(err) = frame_sender.send(msg_bytes).await {
+ // TODO Clean: error handling
+ error!("Error while sending, {}", err);
+ // error!("Client seems disconnected, closing resources");
+ // if let Err(_) = close_sender_clone.send(()) {
+ // error!("Failed to close all client streams & resources")
+ // }
+ to_sync_client.send(
+ on_lost_connection())
+ .await
+ .expect("Failed to signal connection lost to sync client");
+ }
+ }
+ } => {
+ trace!("Ordered Reliable Channel task ended")
+ }
+ }
+ while let Ok(msg_bytes) = to_server_receiver.try_recv() {
+ if let Err(err) = frame_sender.send(msg_bytes).await {
+ error!("Error while sending, {}", err);
+ }
+ }
+ if let Err(err) = frame_sender.flush().await {
+ error!("Error while flushing stream: {}", err);
+ }
+ if let Err(err) = frame_sender.into_inner().finish().await {
+ error!("Failed to shutdown stream gracefully: {}", err);
+ }
+ todo!("Do not close here, wait for all channels to be flushed");
+ connection.close(VarInt::from_u32(0), "closed".as_bytes());
+}
-// async fn unordered_reliable_channel_task(
-// connection: quinn::Connection,
-// to_sync_client: mpsc::Sender<InternalAsyncMessage>,
-// mut close_receiver: broadcast::Receiver<()>,
-// mut to_server_receiver: mpsc::Receiver<Bytes>,
-// ) {
-// tokio::select! {
-// _ = close_receiver.recv() => {
-// trace!("Unordered Reliable Send Channel received a close signal")
-// }
-// _ = async {
-// while let Some(msg_bytes) = to_server_receiver.recv().await {
-// let uni_sender = connection
-// .open_uni()
-// .await
-// .expect("Failed to open send stream");
-// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
+ connection: quinn::Connection,
+ to_sync_client: mpsc::Sender<T>,
+ on_lost_connection: fn() -> T,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut to_server_receiver: mpsc::Receiver<Bytes>,
+) {
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Unordered Reliable Channel task received a close signal")
+ }
+ _ = async {
+ while let Some(msg_bytes) = to_server_receiver.recv().await {
+ let uni_sender = connection
+ .open_uni()
+ .await
+ .expect("Failed to open send stream");
+ let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
-// if let Err(err) = frame_sender.send(msg_bytes).await {
-// // TODO Clean: error handling
-// error!("Error while sending, {}", err);
-// // error!("Client seems disconnected, closing resources");
-// // if let Err(_) = close_sender_clone.send(()) {
-// // error!("Failed to close all client streams & resources")
-// // }
-// to_sync_client.send(
-// InternalAsyncMessage::LostConnection)
-// .await
-// .expect("Failed to signal connection lost to sync client");
-// }
-// }
-// } => {
-// trace!("Unordered Reliable Send Channel ended")
-// }
+ if let Err(err) = frame_sender.send(msg_bytes).await {
+ // TODO Clean: error handling
+ error!("Error while sending, {}", err);
+ // error!("Client seems disconnected, closing resources");
+ // if let Err(_) = close_sender_clone.send(()) {
+ // error!("Failed to close all client streams & resources")
+ // }
+ to_sync_client.send(
+ on_lost_connection())
+ .await
+ .expect("Failed to signal connection lost to sync client");
+ }
+ }
+ } => {
+ trace!("Unordered Reliable Channel task ended")
+ }
+ }
+}
+
+// async fn send_msg(
+// // close_sender: &tokio::sync::broadcast::Sender<()>,
+// to_sync_client: &mpsc::Sender<InternalAsyncMessage>,
+// frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>,
+// msg_bytes: Bytes,
+// ) {
+// if let Err(err) = frame_send.send(msg_bytes).await {
+// error!("Error while sending, {}", err);
+// error!("Client seems disconnected, closing resources");
+// // Emit LostConnection to properly update the connection about its state.
+// // Raise LostConnection event before emitting a close signal because we have no guarantee to continue this async execution after the close signal has been processed.
+// to_sync_client
+// .send(InternalAsyncMessage::LostConnection)
+// .await
+// .expect("Failed to signal connection lost to sync client");
+// // if let Err(_) = close_sender.send(()) {
+// // error!("Failed to close all client streams & resources")
+// // }
// }
// }