diff options
author | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-16 19:08:14 +0100 |
---|---|---|
committer | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-16 19:08:14 +0100 |
commit | a41bc0a4895d40637afb889709917f3ee6a2b835 (patch) | |
tree | 08955e9b30a0882b96dbea0610b60ec96a875126 | |
parent | 75348f78004dee101704240a8ce537ad312a5c69 (diff) |
[channels] Basic Unreliable channel, doc strings and logs
-rw-r--r-- | src/shared/channel.rs | 72 |
1 files changed, 51 insertions, 21 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index d61f4de..4860b49 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -2,7 +2,7 @@ use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; use futures::sink::SinkExt; -use quinn::{SendStream, VarInt}; +use quinn::{SendDatagramError, SendStream, VarInt}; use std::fmt::Debug; use tokio::sync::{ broadcast, @@ -21,8 +21,13 @@ pub enum ChannelType { #[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] pub enum ChannelId { + /// An OrderedReliable channel ensures that messages sent are delivered, and are processed by the receiving end in the same order as they were sent. OrderedReliable(MultiChannelId), + /// An UnorderedReliable channel ensures that messages sent are delivered, but they may be delivered out of order. UnorderedReliable, + /// Channel which transmits messages as unreliable and unordered datagrams (may be lost or delivered out of order). + /// + /// The maximum allowed size of a datagram may change over the lifetime of a connection according to variation in the path MTU estimate. This is guaranteed to be a little over a kilobyte at minimum. Unreliable, } @@ -194,11 +199,11 @@ pub(crate) async fn ordered_reliable_channel_task( _ = async { while let Some(msg_bytes) = bytes_to_channel_recv.recv().await { if let Err(err) = frame_sender.send(msg_bytes).await { - error!("Error while sending, {}", err); + error!("Error while sending on Ordered Reliable Channel, {}", err); from_channels_send.send( ChannelAsyncMessage::LostConnection) .await - .expect("Failed to signal connection lost to sync client"); + .expect("Failed to signal connection lost on Ordered Reliable Channel"); } } } => { @@ -207,14 +212,23 @@ pub(crate) async fn ordered_reliable_channel_task( }; while let Ok(msg_bytes) = bytes_to_channel_recv.try_recv() { if let Err(err) = frame_sender.send(msg_bytes).await { - error!("Error while sending, {}", err); + error!( + "Error while sending a remaining message on Ordered Reliable Channel, {}", + err + ); } } if let Err(err) = frame_sender.flush().await { - error!("Error while flushing stream: {}", err); + error!( + "Error while flushing Ordered Reliable Channel stream: {}", + err + ); } if let Err(err) = frame_sender.into_inner().finish().await { - error!("Failed to shutdown stream gracefully: {}", err); + error!( + "Failed to shutdown Ordered Reliable Channel stream gracefully: {}", + err + ); } } @@ -228,7 +242,7 @@ pub(crate) async fn unordered_reliable_channel_task( ) { tokio::select! { _ = close_recv.recv() => { - trace!("Ordered Reliable Channel task received a close signal") + trace!("Unordered Reliable Channel task received a close signal") } _ = channel_close_recv.recv() => { trace!("Unordered Reliable Channel task received a channel close signal") @@ -236,19 +250,19 @@ pub(crate) async fn unordered_reliable_channel_task( _ = async { while let Some(msg_bytes) = bytes_to_channel_recv.recv().await { let conn = connection.clone(); - let to_sync_client_clone = from_channels_send.clone(); + let from_channels_send_clone = from_channels_send.clone(); let channels_keepalive_clone = channel_tasks_keepalive.clone(); tokio::spawn(async move { let mut frame_sender = new_uni_frame_sender(&conn).await; if let Err(err) = frame_sender.send(msg_bytes).await { - error!("Error while sending, {}", err); - to_sync_client_clone.send( + error!("Error while sending on Unordered Reliable Channel, {}", err); + from_channels_send_clone.send( ChannelAsyncMessage::LostConnection) .await - .expect("Failed to signal connection lost to sync client"); + .expect("Failed to signal connection lost on Unordered Reliable Channel"); } if let Err(err) = frame_sender.into_inner().finish().await { - error!("Failed to shutdown stream gracefully: {}", err); + error!("Failed to shutdown Unordered Reliable Channel stream gracefully: {}", err); } drop(channels_keepalive_clone) }); @@ -263,10 +277,16 @@ pub(crate) async fn unordered_reliable_channel_task( tokio::spawn(async move { let mut frame_sender = new_uni_frame_sender(&conn).await; if let Err(err) = frame_sender.send(msg_bytes).await { - error!("Error while sending, {}", err); + error!( + "Error while sending a remaining message on Unordered Reliable Channel, {}", + err + ); } if let Err(err) = frame_sender.into_inner().finish().await { - error!("Failed to shutdown stream gracefully: {}", err); + error!( + "Failed to shutdown Unordered Reliable Channel stream gracefully: {}", + err + ); } drop(channels_keepalive_clone) }); @@ -290,13 +310,20 @@ pub(crate) async fn unreliable_channel_task( } _ = async { while let Some(msg_bytes) = bytes_to_channel_recv.recv().await { - // TODO Test datagram_send_buffer_space + adapt to max_datagram_size if let Err(err) = connection.send_datagram(msg_bytes) { - error!("Error while sending, {}", err); - from_channels_send.send( - ChannelAsyncMessage::LostConnection) - .await - .expect("Failed to signal connection lost to sync client"); + error!("Error while sending message on Unreliable Channel, {}", err); + match err { + SendDatagramError::UnsupportedByPeer => (), + SendDatagramError::Disabled => (), + SendDatagramError::TooLarge => (), + SendDatagramError::ConnectionLost(_) => { + from_channels_send.send( + ChannelAsyncMessage::LostConnection) + .await + .expect("Failed to signal connection lost from channels"); + }, + } + } } } => { @@ -305,7 +332,10 @@ pub(crate) async fn unreliable_channel_task( }; while let Ok(msg_bytes) = bytes_to_channel_recv.try_recv() { if let Err(err) = connection.send_datagram(msg_bytes) { - error!("Error while sending, {}", err); + error!( + "Error while sending a remaining message on Unreliable Channel, {}", + err + ); } } } |