aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs83
1 files changed, 14 insertions, 69 deletions
diff --git a/src/client.rs b/src/client.rs
index 0c7ab02..948d13c 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -10,7 +10,6 @@ use std::{
use bevy::prelude::*;
use bytes::Bytes;
-use futures::sink::SinkExt;
use futures_util::StreamExt;
use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, SendStream};
use quinn_proto::{ConnectionStats, VarInt};
@@ -27,10 +26,13 @@ use tokio::{
},
task::JoinSet,
};
-use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
+use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use crate::shared::{
- channel::{Channel, ChannelId, ChannelType, OrdRelChannelId},
+ channel::{
+ ordered_reliable_channel_task, unordered_reliable_channel_task, Channel, ChannelId,
+ ChannelType, OrdRelChannelId,
+ },
AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
@@ -669,6 +671,7 @@ async fn handle_connection_channels(
ordered_reliable_channel_task(
connection_handle,
to_sync_client,
+ || InternalAsyncMessage::LostConnection,
close_receiver,
to_server_receiver
)
@@ -677,13 +680,14 @@ async fn handle_connection_channels(
},
ChannelId::UnorderedReliable => {
tokio::spawn(async move {
- // unordered_reliable_channel_task(
- // connection_handle,
- // to_sync_client,
- // close_receiver,
- // to_server_receiver
- // )
- // .await
+ unordered_reliable_channel_task(
+ connection_handle,
+ to_sync_client,
+ || InternalAsyncMessage::LostConnection,
+ close_receiver,
+ to_server_receiver
+ )
+ .await
});
},
ChannelId::Unreliable => todo!(),
@@ -695,65 +699,6 @@ async fn handle_connection_channels(
};
}
-async fn ordered_reliable_channel_task(
- connection: quinn::Connection,
- to_sync_client: mpsc::Sender<InternalAsyncMessage>,
- mut close_receiver: broadcast::Receiver<()>,
- mut to_server_receiver: mpsc::Receiver<Bytes>,
-) {
- let uni_sender = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
- tokio::select! {
- _ = close_receiver.recv() => {
- trace!("Ordered Reliable Send Channel received a close signal")
- }
- _ = async {
- while let Some(msg_bytes) = to_server_receiver.recv().await {
- send_msg(&to_sync_client, &mut frame_sender, msg_bytes).await;
- }
- } => {
- trace!("Ordered Reliable Send Channel ended")
- }
- }
- while let Ok(msg_bytes) = to_server_receiver.try_recv() {
- if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- }
- }
- if let Err(err) = frame_sender.flush().await {
- error!("Error while flushing stream: {}", err);
- }
- if let Err(err) = frame_sender.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
- }
- todo!("Do not close here, wait for all channels to be flushed");
- connection.close(VarInt::from_u32(0), "closed".as_bytes());
-}
-
-async fn send_msg(
- // close_sender: &tokio::sync::broadcast::Sender<()>,
- to_sync_client: &mpsc::Sender<InternalAsyncMessage>,
- frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>,
- msg_bytes: Bytes,
-) {
- if let Err(err) = frame_send.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- error!("Client seems disconnected, closing resources");
- // Emit LostConnection to properly update the connection about its state.
- // Raise LostConnection event before emitting a close signal because we have no guarantee to continue this async execution after the close signal has been processed.
- to_sync_client
- .send(InternalAsyncMessage::LostConnection)
- .await
- .expect("Failed to signal connection lost to sync client");
- // if let Err(_) = close_sender.send(()) {
- // error!("Failed to close all client streams & resources")
- // }
- }
-}
-
// Receive messages from the async client tasks and update the sync client.
fn update_sync_client(
mut connection_events: EventWriter<ConnectionEvent>,