aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.rs15
-rw-r--r--src/client/connection.rs114
2 files changed, 41 insertions, 88 deletions
diff --git a/src/client.rs b/src/client.rs
index 51cc0ea..0efe051 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -148,9 +148,17 @@ impl Client {
connection.open_channel(ChannelType::UnorderedReliable)?;
connection.open_channel(ChannelType::Unreliable)?;
+ self.last_gen_id += 1;
+ let connection_id = self.last_gen_id;
+ self.connections.insert(connection_id, connection);
+ if self.default_connection_id.is_none() {
+ self.default_connection_id = Some(connection_id);
+ }
+
// Async connection
self.runtime.spawn(async move {
connection_task(
+ connection_id,
config,
cert_mode,
to_sync_client_send,
@@ -162,13 +170,6 @@ impl Client {
.await
});
- self.last_gen_id += 1;
- let connection_id = self.last_gen_id;
- self.connections.insert(connection_id, connection);
- if self.default_connection_id.is_none() {
- self.default_connection_id = Some(connection_id);
- }
-
Ok((connection_id, ordered_reliable_id))
}
diff --git a/src/client/connection.rs b/src/client/connection.rs
index 94bcf2d..ed99517 100644
--- a/src/client/connection.rs
+++ b/src/client/connection.rs
@@ -1,9 +1,8 @@
use std::{collections::HashMap, error::Error, net::SocketAddr, sync::Arc};
-use bevy::prelude::{error, info, trace};
+use bevy::prelude::{error, info};
use bytes::Bytes;
-use futures::StreamExt;
-use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, RecvStream};
+use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint};
use quinn_proto::ConnectionStats;
use serde::Deserialize;
@@ -14,12 +13,11 @@ use tokio::sync::{
error::{TryRecvError, TrySendError},
},
};
-use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use crate::shared::{
channel::{
- channels_task, get_channel_id_from_type, Channel, ChannelAsyncMessage, ChannelId,
- ChannelSyncMessage, ChannelType, MultiChannelId,
+ channels_task, get_channel_id_from_type, reliable_receiver_task, unreliable_receiver_task,
+ Channel, ChannelAsyncMessage, ChannelId, ChannelSyncMessage, ChannelType, MultiChannelId,
},
QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
@@ -358,6 +356,7 @@ impl Connection {
}
pub(crate) async fn connection_task(
+ connection_id: ConnectionId,
config: ConnectionConfiguration,
cert_mode: CertificateVerificationMode,
to_sync_client_send: mpsc::Sender<ClientAsyncMessage>,
@@ -370,7 +369,10 @@ pub(crate) async fn connection_task(
let srv_host = config.server_host.clone();
let local_bind_adr = format!("{}:{}", config.local_bind_host, config.local_bind_port);
- info!("Trying to connect to server on: {} ...", server_adr_str);
+ info!(
+ "Connection {} trying to connect to server on: {} ...",
+ connection_id, server_adr_str
+ );
let server_addr: SocketAddr = server_adr_str
.parse()
@@ -388,9 +390,16 @@ pub(crate) async fn connection_task(
.expect("Failed to connect: configuration error")
.await;
match connection {
- Err(e) => error!("Error while connecting: {}", e),
+ Err(e) => error!(
+ "Connection {}, error while connecting: {}",
+ connection_id, e
+ ),
Ok(connection) => {
- info!("Connected to {}", connection.remote_address());
+ info!(
+ "Connection {} connected to {}",
+ connection_id,
+ connection.remote_address()
+ );
to_sync_client_send
.send(ClientAsyncMessage::Connected(connection.clone()))
@@ -403,7 +412,7 @@ pub(crate) async fn connection_task(
let to_sync_client = to_sync_client_send.clone();
tokio::spawn(async move {
let conn_err = conn.closed().await;
- info!("Disconnected: {}", conn_err);
+ info!("Connection {} disconnected: {}", connection_id, conn_err);
// If we requested the connection to close, channel may have been closed already.
if !to_sync_client.is_closed() {
to_sync_client
@@ -420,7 +429,13 @@ pub(crate) async fn connection_task(
let connection_handle = connection.clone();
let bytes_incoming_send = bytes_from_server_send.clone();
tokio::spawn(async move {
- reliable_receiver_task(connection_handle, close_recv, bytes_incoming_send).await
+ reliable_receiver_task(
+ connection_id,
+ connection_handle,
+ close_recv,
+ bytes_incoming_send,
+ )
+ .await
});
}
@@ -430,8 +445,13 @@ pub(crate) async fn connection_task(
let connection_handle = connection.clone();
let bytes_incoming_send = bytes_from_server_send.clone();
tokio::spawn(async move {
- unreliable_receiver_task(connection_handle, close_recv, bytes_incoming_send)
- .await
+ unreliable_receiver_task(
+ connection_id,
+ connection_handle,
+ close_recv,
+ bytes_incoming_send,
+ )
+ .await
});
}
@@ -443,74 +463,6 @@ pub(crate) async fn connection_task(
}
}
-async fn uni_receiver_task(
- recv: RecvStream,
- mut close_recv: broadcast::Receiver<()>,
- bytes_from_server_send: mpsc::Sender<Bytes>,
-) {
- tokio::select! {
- _ = close_recv.recv() => {
- trace!("Listener of a Unidirectional Receiving Streams received a close signal")
- }
- _ = async {
- let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
- while let Some(Ok(msg_bytes)) = frame_recv.next().await {
- // TODO Clean: error handling
- bytes_from_server_send.send(msg_bytes.into()).await.unwrap();
- }
- } => {}
- };
-}
-
-async fn reliable_receiver_task(
- connection: quinn::Connection,
- mut close_recv: broadcast::Receiver<()>,
- bytes_incoming_send: mpsc::Sender<Bytes>,
-) {
- let close_recv_clone = close_recv.resubscribe();
- tokio::select! {
- _ = close_recv.recv() => {
- trace!("Listener for new Unidirectional Receiving Streams received a close signal")
- }
- _ = async {
- while let Ok(recv) = connection.accept_uni().await {
- let bytes_from_server_send = bytes_incoming_send.clone();
- let close_recv_clone = close_recv_clone.resubscribe();
- tokio::spawn(async move {
- uni_receiver_task(
- recv,
- close_recv_clone,
- bytes_from_server_send
- ).await;
- });
- }
- } => {
- trace!("Listener for new Unidirectional Receiving Streams ended")
- }
- };
- trace!("All unidirectional stream receivers cleaned");
-}
-
-async fn unreliable_receiver_task(
- connection: quinn::Connection,
- mut close_recv: broadcast::Receiver<()>,
- bytes_incoming_send: mpsc::Sender<Bytes>,
-) {
- tokio::select! {
- _ = close_recv.recv() => {
- trace!("Listener for unreliable datagrams received a close signal")
- }
- _ = async {
- while let Ok(msg_bytes) = connection.read_datagram().await {
- // TODO Clean: error handling
- bytes_incoming_send.send(msg_bytes.into()).await.unwrap();
- }
- } => {
- trace!("Listener for unreliable datagrams ended")
- }
- };
-}
-
fn configure_client(
cert_mode: CertificateVerificationMode,
to_sync_client: mpsc::Sender<ClientAsyncMessage>,