diff options
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r-- | src/shared/channel.rs | 35 |
1 files changed, 13 insertions, 22 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 2091a9a..54ad3d7 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -2,7 +2,7 @@ use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; use futures::sink::SinkExt; -use quinn::VarInt; +use quinn::{SendStream, VarInt}; use std::fmt::Debug; use tokio::sync::{ broadcast, @@ -60,11 +60,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( mut close_receiver: broadcast::Receiver<()>, mut to_server_receiver: mpsc::Receiver<Bytes>, ) { - let uni_sender = connection - .open_uni() - .await - .expect("Failed to open send stream"); - let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); + let mut frame_sender = new_uni_frame_sender(&connection).await; tokio::select! { _ = close_receiver.recv() => { @@ -73,12 +69,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( _ = async { while let Some(msg_bytes) = to_server_receiver.recv().await { if let Err(err) = frame_sender.send(msg_bytes).await { - // TODO Clean: error handling error!("Error while sending, {}", err); - // error!("Client seems disconnected, closing resources"); - // if let Err(_) = close_sender_clone.send(()) { - // error!("Failed to close all client streams & resources") - // } to_sync_client.send( on_lost_connection()) .await @@ -117,19 +108,9 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( } _ = async { while let Some(msg_bytes) = to_server_receiver.recv().await { - let uni_sender = connection - .open_uni() - .await - .expect("Failed to open send stream"); - let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); - + let mut frame_sender = new_uni_frame_sender(&connection).await; if let Err(err) = frame_sender.send(msg_bytes).await { - // TODO Clean: error handling error!("Error while sending, {}", err); - // error!("Client seems disconnected, closing resources"); - // if let Err(_) = close_sender_clone.send(()) { - // error!("Failed to close all client streams & resources") - // } to_sync_client.send( on_lost_connection()) .await @@ -142,6 +123,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( } } +async fn new_uni_frame_sender( + connection: &quinn::Connection, +) -> FramedWrite<SendStream, LengthDelimitedCodec> { + let uni_sender = connection + .open_uni() + .await + .expect("Failed to open send stream"); + FramedWrite::new(uni_sender, LengthDelimitedCodec::new()) +} + // async fn send_msg( // // close_sender: &tokio::sync::broadcast::Sender<()>, // to_sync_client: &mpsc::Sender<InternalAsyncMessage>, |