diff options
-rw-r--r-- | src/client.rs | 83 | ||||
-rw-r--r-- | src/shared/channel.rs | 180 |
2 files changed, 122 insertions, 141 deletions
diff --git a/src/client.rs b/src/client.rs index 0c7ab02..948d13c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -10,7 +10,6 @@ use std::{ use bevy::prelude::*; use bytes::Bytes; -use futures::sink::SinkExt; use futures_util::StreamExt; use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, SendStream}; use quinn_proto::{ConnectionStats, VarInt}; @@ -27,10 +26,13 @@ use tokio::{ }, task::JoinSet, }; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; +use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use crate::shared::{ - channel::{Channel, ChannelId, ChannelType, OrdRelChannelId}, + channel::{ + ordered_reliable_channel_task, unordered_reliable_channel_task, Channel, ChannelId, + ChannelType, OrdRelChannelId, + }, AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, }; @@ -669,6 +671,7 @@ async fn handle_connection_channels( ordered_reliable_channel_task( connection_handle, to_sync_client, + || InternalAsyncMessage::LostConnection, close_receiver, to_server_receiver ) @@ -677,13 +680,14 @@ async fn handle_connection_channels( }, ChannelId::UnorderedReliable => { tokio::spawn(async move { - // unordered_reliable_channel_task( - // connection_handle, - // to_sync_client, - // close_receiver, - // to_server_receiver - // ) - // .await + unordered_reliable_channel_task( + connection_handle, + to_sync_client, + || InternalAsyncMessage::LostConnection, + close_receiver, + to_server_receiver + ) + .await }); }, ChannelId::Unreliable => todo!(), @@ -695,65 +699,6 @@ async fn handle_connection_channels( }; } -async fn ordered_reliable_channel_task( - connection: quinn::Connection, - to_sync_client: mpsc::Sender<InternalAsyncMessage>, - mut close_receiver: broadcast::Receiver<()>, - mut to_server_receiver: mpsc::Receiver<Bytes>, -) { - let uni_sender = connection - .open_uni() - .await - .expect("Failed to open send stream"); - let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); - tokio::select! { - _ = close_receiver.recv() => { - trace!("Ordered Reliable Send Channel received a close signal") - } - _ = async { - while let Some(msg_bytes) = to_server_receiver.recv().await { - send_msg(&to_sync_client, &mut frame_sender, msg_bytes).await; - } - } => { - trace!("Ordered Reliable Send Channel ended") - } - } - while let Ok(msg_bytes) = to_server_receiver.try_recv() { - if let Err(err) = frame_sender.send(msg_bytes).await { - error!("Error while sending, {}", err); - } - } - if let Err(err) = frame_sender.flush().await { - error!("Error while flushing stream: {}", err); - } - if let Err(err) = frame_sender.into_inner().finish().await { - error!("Failed to shutdown stream gracefully: {}", err); - } - todo!("Do not close here, wait for all channels to be flushed"); - connection.close(VarInt::from_u32(0), "closed".as_bytes()); -} - -async fn send_msg( - // close_sender: &tokio::sync::broadcast::Sender<()>, - to_sync_client: &mpsc::Sender<InternalAsyncMessage>, - frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>, - msg_bytes: Bytes, -) { - if let Err(err) = frame_send.send(msg_bytes).await { - error!("Error while sending, {}", err); - error!("Client seems disconnected, closing resources"); - // Emit LostConnection to properly update the connection about its state. - // Raise LostConnection event before emitting a close signal because we have no guarantee to continue this async execution after the close signal has been processed. - to_sync_client - .send(InternalAsyncMessage::LostConnection) - .await - .expect("Failed to signal connection lost to sync client"); - // if let Err(_) = close_sender.send(()) { - // error!("Failed to close all client streams & resources") - // } - } -} - // Receive messages from the async client tasks and update the sync client. fn update_sync_client( mut connection_events: EventWriter<ConnectionEvent>, diff --git a/src/shared/channel.rs b/src/shared/channel.rs index fcdaf1a..2091a9a 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -1,14 +1,14 @@ +use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; use futures::sink::SinkExt; -use futures_util::StreamExt; +use quinn::VarInt; +use std::fmt::Debug; use tokio::sync::{ broadcast, mpsc::{self, error::TrySendError}, }; -use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; - -use super::QuinnetError; +use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; pub(crate) type OrdRelChannelId = u64; @@ -53,76 +53,112 @@ impl Channel { } } -// async fn ordered_reliable_channel_task( -// connection: quinn::Connection, -// to_sync_client: mpsc::Sender<InternalAsyncMessage>, -// mut close_receiver: broadcast::Receiver<()>, -// mut to_server_receiver: mpsc::Receiver<Bytes>, -// ) { -// tokio::select! { -// _ = close_receiver.recv() => { -// trace!("Ordered Reliable Send Channel received a close signal") -// } -// _ = async { -// let uni_sender = connection -// .open_uni() -// .await -// .expect("Failed to open send stream"); -// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); +pub(crate) async fn ordered_reliable_channel_task<T: Debug>( + connection: quinn::Connection, + to_sync_client: mpsc::Sender<T>, + on_lost_connection: fn() -> T, + mut close_receiver: broadcast::Receiver<()>, + mut to_server_receiver: mpsc::Receiver<Bytes>, +) { + let uni_sender = connection + .open_uni() + .await + .expect("Failed to open send stream"); + let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); -// while let Some(msg_bytes) = to_server_receiver.recv().await { -// if let Err(err) = frame_sender.send(msg_bytes).await { -// // TODO Clean: error handling -// error!("Error while sending, {}", err); -// // error!("Client seems disconnected, closing resources"); -// // if let Err(_) = close_sender_clone.send(()) { -// // error!("Failed to close all client streams & resources") -// // } -// to_sync_client.send( -// InternalAsyncMessage::LostConnection) -// .await -// .expect("Failed to signal connection lost to sync client"); -// } -// } -// } => { -// trace!("Ordered Reliable Send Channel ended") -// } -// } -// } + tokio::select! { + _ = close_receiver.recv() => { + trace!("Ordered Reliable Channel task received a close signal") + } + _ = async { + while let Some(msg_bytes) = to_server_receiver.recv().await { + if let Err(err) = frame_sender.send(msg_bytes).await { + // TODO Clean: error handling + error!("Error while sending, {}", err); + // error!("Client seems disconnected, closing resources"); + // if let Err(_) = close_sender_clone.send(()) { + // error!("Failed to close all client streams & resources") + // } + to_sync_client.send( + on_lost_connection()) + .await + .expect("Failed to signal connection lost to sync client"); + } + } + } => { + trace!("Ordered Reliable Channel task ended") + } + } + while let Ok(msg_bytes) = to_server_receiver.try_recv() { + if let Err(err) = frame_sender.send(msg_bytes).await { + error!("Error while sending, {}", err); + } + } + if let Err(err) = frame_sender.flush().await { + error!("Error while flushing stream: {}", err); + } + if let Err(err) = frame_sender.into_inner().finish().await { + error!("Failed to shutdown stream gracefully: {}", err); + } + todo!("Do not close here, wait for all channels to be flushed"); + connection.close(VarInt::from_u32(0), "closed".as_bytes()); +} -// async fn unordered_reliable_channel_task( -// connection: quinn::Connection, -// to_sync_client: mpsc::Sender<InternalAsyncMessage>, -// mut close_receiver: broadcast::Receiver<()>, -// mut to_server_receiver: mpsc::Receiver<Bytes>, -// ) { -// tokio::select! { -// _ = close_receiver.recv() => { -// trace!("Unordered Reliable Send Channel received a close signal") -// } -// _ = async { -// while let Some(msg_bytes) = to_server_receiver.recv().await { -// let uni_sender = connection -// .open_uni() -// .await -// .expect("Failed to open send stream"); -// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); +pub(crate) async fn unordered_reliable_channel_task<T: Debug>( + connection: quinn::Connection, + to_sync_client: mpsc::Sender<T>, + on_lost_connection: fn() -> T, + mut close_receiver: broadcast::Receiver<()>, + mut to_server_receiver: mpsc::Receiver<Bytes>, +) { + tokio::select! { + _ = close_receiver.recv() => { + trace!("Unordered Reliable Channel task received a close signal") + } + _ = async { + while let Some(msg_bytes) = to_server_receiver.recv().await { + let uni_sender = connection + .open_uni() + .await + .expect("Failed to open send stream"); + let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); -// if let Err(err) = frame_sender.send(msg_bytes).await { -// // TODO Clean: error handling -// error!("Error while sending, {}", err); -// // error!("Client seems disconnected, closing resources"); -// // if let Err(_) = close_sender_clone.send(()) { -// // error!("Failed to close all client streams & resources") -// // } -// to_sync_client.send( -// InternalAsyncMessage::LostConnection) -// .await -// .expect("Failed to signal connection lost to sync client"); -// } -// } -// } => { -// trace!("Unordered Reliable Send Channel ended") -// } + if let Err(err) = frame_sender.send(msg_bytes).await { + // TODO Clean: error handling + error!("Error while sending, {}", err); + // error!("Client seems disconnected, closing resources"); + // if let Err(_) = close_sender_clone.send(()) { + // error!("Failed to close all client streams & resources") + // } + to_sync_client.send( + on_lost_connection()) + .await + .expect("Failed to signal connection lost to sync client"); + } + } + } => { + trace!("Unordered Reliable Channel task ended") + } + } +} + +// async fn send_msg( +// // close_sender: &tokio::sync::broadcast::Sender<()>, +// to_sync_client: &mpsc::Sender<InternalAsyncMessage>, +// frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>, +// msg_bytes: Bytes, +// ) { +// if let Err(err) = frame_send.send(msg_bytes).await { +// error!("Error while sending, {}", err); +// error!("Client seems disconnected, closing resources"); +// // Emit LostConnection to properly update the connection about its state. +// // Raise LostConnection event before emitting a close signal because we have no guarantee to continue this async execution after the close signal has been processed. +// to_sync_client +// .send(InternalAsyncMessage::LostConnection) +// .await +// .expect("Failed to signal connection lost to sync client"); +// // if let Err(_) = close_sender.send(()) { +// // error!("Failed to close all client streams & resources") +// // } // } // } |