aboutsummaryrefslogtreecommitdiff
path: root/src/shared/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r--src/shared/channel.rs17
1 files changed, 12 insertions, 5 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 96d8afc..72dca41 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -2,7 +2,7 @@ use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use quinn::{SendStream, VarInt};
+use quinn::SendStream;
use std::fmt::Debug;
use tokio::sync::{
broadcast,
@@ -69,6 +69,7 @@ impl Channel {
pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
+ _: mpsc::Sender<()>,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
mut close_receiver: broadcast::Receiver<()>,
@@ -97,7 +98,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
} => {
trace!("Ordered Reliable Channel task ended")
}
- }
+ };
while let Ok(msg_bytes) = to_server_receiver.try_recv() {
if let Err(err) = frame_sender.send(msg_bytes).await {
error!("Error while sending, {}", err);
@@ -109,12 +110,11 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
if let Err(err) = frame_sender.into_inner().finish().await {
error!("Failed to shutdown stream gracefully: {}", err);
}
- todo!("Do not close here, wait for all channels to be flushed");
- connection.close(VarInt::from_u32(0), "closed".as_bytes());
}
pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
+ _: mpsc::Sender<()>,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
mut close_receiver: broadcast::Receiver<()>,
@@ -138,12 +138,19 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
.await
.expect("Failed to signal connection lost to sync client");
}
+ todo!("finish the stream")
}
} => {
trace!("Unordered Reliable Channel task ended")
}
+ };
+ while let Ok(msg_bytes) = to_server_receiver.try_recv() {
+ let mut frame_sender = new_uni_frame_sender(&connection).await;
+ if let Err(err) = frame_sender.send(msg_bytes).await {
+ error!("Error while sending, {}", err);
+ }
+ todo!("finish the stream")
}
- todo!("Flush and signal finished")
}
async fn new_uni_frame_sender(