diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 15 | ||||
-rw-r--r-- | src/client/connection.rs | 114 |
2 files changed, 41 insertions, 88 deletions
diff --git a/src/client.rs b/src/client.rs index 51cc0ea..0efe051 100644 --- a/src/client.rs +++ b/src/client.rs @@ -148,9 +148,17 @@ impl Client { connection.open_channel(ChannelType::UnorderedReliable)?; connection.open_channel(ChannelType::Unreliable)?; + self.last_gen_id += 1; + let connection_id = self.last_gen_id; + self.connections.insert(connection_id, connection); + if self.default_connection_id.is_none() { + self.default_connection_id = Some(connection_id); + } + // Async connection self.runtime.spawn(async move { connection_task( + connection_id, config, cert_mode, to_sync_client_send, @@ -162,13 +170,6 @@ impl Client { .await }); - self.last_gen_id += 1; - let connection_id = self.last_gen_id; - self.connections.insert(connection_id, connection); - if self.default_connection_id.is_none() { - self.default_connection_id = Some(connection_id); - } - Ok((connection_id, ordered_reliable_id)) } diff --git a/src/client/connection.rs b/src/client/connection.rs index 94bcf2d..ed99517 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,9 +1,8 @@ use std::{collections::HashMap, error::Error, net::SocketAddr, sync::Arc}; -use bevy::prelude::{error, info, trace}; +use bevy::prelude::{error, info}; use bytes::Bytes; -use futures::StreamExt; -use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint, RecvStream}; +use quinn::{ClientConfig, Connection as QuinnConnection, Endpoint}; use quinn_proto::ConnectionStats; use serde::Deserialize; @@ -14,12 +13,11 @@ use tokio::sync::{ error::{TryRecvError, TrySendError}, }, }; -use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use crate::shared::{ channel::{ - channels_task, get_channel_id_from_type, Channel, ChannelAsyncMessage, ChannelId, - ChannelSyncMessage, ChannelType, MultiChannelId, + channels_task, get_channel_id_from_type, reliable_receiver_task, unreliable_receiver_task, + Channel, ChannelAsyncMessage, ChannelId, ChannelSyncMessage, ChannelType, MultiChannelId, }, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, }; @@ -358,6 +356,7 @@ impl Connection { } pub(crate) async fn connection_task( + connection_id: ConnectionId, config: ConnectionConfiguration, cert_mode: CertificateVerificationMode, to_sync_client_send: mpsc::Sender<ClientAsyncMessage>, @@ -370,7 +369,10 @@ pub(crate) async fn connection_task( let srv_host = config.server_host.clone(); let local_bind_adr = format!("{}:{}", config.local_bind_host, config.local_bind_port); - info!("Trying to connect to server on: {} ...", server_adr_str); + info!( + "Connection {} trying to connect to server on: {} ...", + connection_id, server_adr_str + ); let server_addr: SocketAddr = server_adr_str .parse() @@ -388,9 +390,16 @@ pub(crate) async fn connection_task( .expect("Failed to connect: configuration error") .await; match connection { - Err(e) => error!("Error while connecting: {}", e), + Err(e) => error!( + "Connection {}, error while connecting: {}", + connection_id, e + ), Ok(connection) => { - info!("Connected to {}", connection.remote_address()); + info!( + "Connection {} connected to {}", + connection_id, + connection.remote_address() + ); to_sync_client_send .send(ClientAsyncMessage::Connected(connection.clone())) @@ -403,7 +412,7 @@ pub(crate) async fn connection_task( let to_sync_client = to_sync_client_send.clone(); tokio::spawn(async move { let conn_err = conn.closed().await; - info!("Disconnected: {}", conn_err); + info!("Connection {} disconnected: {}", connection_id, conn_err); // If we requested the connection to close, channel may have been closed already. if !to_sync_client.is_closed() { to_sync_client @@ -420,7 +429,13 @@ pub(crate) async fn connection_task( let connection_handle = connection.clone(); let bytes_incoming_send = bytes_from_server_send.clone(); tokio::spawn(async move { - reliable_receiver_task(connection_handle, close_recv, bytes_incoming_send).await + reliable_receiver_task( + connection_id, + connection_handle, + close_recv, + bytes_incoming_send, + ) + .await }); } @@ -430,8 +445,13 @@ pub(crate) async fn connection_task( let connection_handle = connection.clone(); let bytes_incoming_send = bytes_from_server_send.clone(); tokio::spawn(async move { - unreliable_receiver_task(connection_handle, close_recv, bytes_incoming_send) - .await + unreliable_receiver_task( + connection_id, + connection_handle, + close_recv, + bytes_incoming_send, + ) + .await }); } @@ -443,74 +463,6 @@ pub(crate) async fn connection_task( } } -async fn uni_receiver_task( - recv: RecvStream, - mut close_recv: broadcast::Receiver<()>, - bytes_from_server_send: mpsc::Sender<Bytes>, -) { - tokio::select! { - _ = close_recv.recv() => { - trace!("Listener of a Unidirectional Receiving Streams received a close signal") - } - _ = async { - let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new()); - while let Some(Ok(msg_bytes)) = frame_recv.next().await { - // TODO Clean: error handling - bytes_from_server_send.send(msg_bytes.into()).await.unwrap(); - } - } => {} - }; -} - -async fn reliable_receiver_task( - connection: quinn::Connection, - mut close_recv: broadcast::Receiver<()>, - bytes_incoming_send: mpsc::Sender<Bytes>, -) { - let close_recv_clone = close_recv.resubscribe(); - tokio::select! { - _ = close_recv.recv() => { - trace!("Listener for new Unidirectional Receiving Streams received a close signal") - } - _ = async { - while let Ok(recv) = connection.accept_uni().await { - let bytes_from_server_send = bytes_incoming_send.clone(); - let close_recv_clone = close_recv_clone.resubscribe(); - tokio::spawn(async move { - uni_receiver_task( - recv, - close_recv_clone, - bytes_from_server_send - ).await; - }); - } - } => { - trace!("Listener for new Unidirectional Receiving Streams ended") - } - }; - trace!("All unidirectional stream receivers cleaned"); -} - -async fn unreliable_receiver_task( - connection: quinn::Connection, - mut close_recv: broadcast::Receiver<()>, - bytes_incoming_send: mpsc::Sender<Bytes>, -) { - tokio::select! { - _ = close_recv.recv() => { - trace!("Listener for unreliable datagrams received a close signal") - } - _ = async { - while let Ok(msg_bytes) = connection.read_datagram().await { - // TODO Clean: error handling - bytes_incoming_send.send(msg_bytes.into()).await.unwrap(); - } - } => { - trace!("Listener for unreliable datagrams ended") - } - }; -} - fn configure_client( cert_mode: CertificateVerificationMode, to_sync_client: mpsc::Sender<ClientAsyncMessage>, |