diff options
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r-- | src/shared/channel.rs | 17 |
1 files changed, 12 insertions, 5 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 96d8afc..72dca41 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -2,7 +2,7 @@ use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; use futures::sink::SinkExt; -use quinn::{SendStream, VarInt}; +use quinn::SendStream; use std::fmt::Debug; use tokio::sync::{ broadcast, @@ -69,6 +69,7 @@ impl Channel { pub(crate) async fn ordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, + _: mpsc::Sender<()>, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, mut close_receiver: broadcast::Receiver<()>, @@ -97,7 +98,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( } => { trace!("Ordered Reliable Channel task ended") } - } + }; while let Ok(msg_bytes) = to_server_receiver.try_recv() { if let Err(err) = frame_sender.send(msg_bytes).await { error!("Error while sending, {}", err); @@ -109,12 +110,11 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>( if let Err(err) = frame_sender.into_inner().finish().await { error!("Failed to shutdown stream gracefully: {}", err); } - todo!("Do not close here, wait for all channels to be flushed"); - connection.close(VarInt::from_u32(0), "closed".as_bytes()); } pub(crate) async fn unordered_reliable_channel_task<T: Debug>( connection: quinn::Connection, + _: mpsc::Sender<()>, to_sync_client: mpsc::Sender<T>, on_lost_connection: fn() -> T, mut close_receiver: broadcast::Receiver<()>, @@ -138,12 +138,19 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>( .await .expect("Failed to signal connection lost to sync client"); } + todo!("finish the stream") } } => { trace!("Unordered Reliable Channel task ended") } + }; + while let Ok(msg_bytes) = to_server_receiver.try_recv() { + let mut frame_sender = new_uni_frame_sender(&connection).await; + if let Err(err) = frame_sender.send(msg_bytes).await { + error!("Error while sending, {}", err); + } + todo!("finish the stream") } - todo!("Flush and signal finished") } async fn new_uni_frame_sender( |