aboutsummaryrefslogtreecommitdiff
path: root/src/shared/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/shared/channel.rs')
-rw-r--r--src/shared/channel.rs35
1 files changed, 13 insertions, 22 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 2091a9a..54ad3d7 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -2,7 +2,7 @@ use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use quinn::VarInt;
+use quinn::{SendStream, VarInt};
use std::fmt::Debug;
use tokio::sync::{
broadcast,
@@ -60,11 +60,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
mut close_receiver: broadcast::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+ let mut frame_sender = new_uni_frame_sender(&connection).await;
tokio::select! {
_ = close_receiver.recv() => {
@@ -73,12 +69,7 @@ pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
- // TODO Clean: error handling
error!("Error while sending, {}", err);
- // error!("Client seems disconnected, closing resources");
- // if let Err(_) = close_sender_clone.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
to_sync_client.send(
on_lost_connection())
.await
@@ -117,19 +108,9 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
-
+ let mut frame_sender = new_uni_frame_sender(&connection).await;
if let Err(err) = frame_sender.send(msg_bytes).await {
- // TODO Clean: error handling
error!("Error while sending, {}", err);
- // error!("Client seems disconnected, closing resources");
- // if let Err(_) = close_sender_clone.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
to_sync_client.send(
on_lost_connection())
.await
@@ -142,6 +123,16 @@ pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
}
}
+async fn new_uni_frame_sender(
+ connection: &quinn::Connection,
+) -> FramedWrite<SendStream, LengthDelimitedCodec> {
+ let uni_sender = connection
+ .open_uni()
+ .await
+ .expect("Failed to open send stream");
+ FramedWrite::new(uni_sender, LengthDelimitedCodec::new())
+}
+
// async fn send_msg(
// // close_sender: &tokio::sync::broadcast::Sender<()>,
// to_sync_client: &mpsc::Sender<InternalAsyncMessage>,