aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-16 19:08:14 +0100
committerHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-16 19:08:14 +0100
commita41bc0a4895d40637afb889709917f3ee6a2b835 (patch)
tree08955e9b30a0882b96dbea0610b60ec96a875126
parent75348f78004dee101704240a8ce537ad312a5c69 (diff)
[channels] Basic Unreliable channel, doc strings and logs
-rw-r--r--src/shared/channel.rs72
1 files changed, 51 insertions, 21 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index d61f4de..4860b49 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -2,7 +2,7 @@ use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use quinn::{SendStream, VarInt};
+use quinn::{SendDatagramError, SendStream, VarInt};
use std::fmt::Debug;
use tokio::sync::{
broadcast,
@@ -21,8 +21,13 @@ pub enum ChannelType {
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ChannelId {
+ /// An OrderedReliable channel ensures that messages sent are delivered, and are processed by the receiving end in the same order as they were sent.
OrderedReliable(MultiChannelId),
+ /// An UnorderedReliable channel ensures that messages sent are delivered, but they may be delivered out of order.
UnorderedReliable,
+ /// Channel which transmits messages as unreliable and unordered datagrams (may be lost or delivered out of order).
+ ///
+ /// The maximum allowed size of a datagram may change over the lifetime of a connection according to variation in the path MTU estimate. This is guaranteed to be a little over a kilobyte at minimum.
Unreliable,
}
@@ -194,11 +199,11 @@ pub(crate) async fn ordered_reliable_channel_task(
_ = async {
while let Some(msg_bytes) = bytes_to_channel_recv.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
+ error!("Error while sending on Ordered Reliable Channel, {}", err);
from_channels_send.send(
ChannelAsyncMessage::LostConnection)
.await
- .expect("Failed to signal connection lost to sync client");
+ .expect("Failed to signal connection lost on Ordered Reliable Channel");
}
}
} => {
@@ -207,14 +212,23 @@ pub(crate) async fn ordered_reliable_channel_task(
};
while let Ok(msg_bytes) = bytes_to_channel_recv.try_recv() {
if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
+ error!(
+ "Error while sending a remaining message on Ordered Reliable Channel, {}",
+ err
+ );
}
}
if let Err(err) = frame_sender.flush().await {
- error!("Error while flushing stream: {}", err);
+ error!(
+ "Error while flushing Ordered Reliable Channel stream: {}",
+ err
+ );
}
if let Err(err) = frame_sender.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
+ error!(
+ "Failed to shutdown Ordered Reliable Channel stream gracefully: {}",
+ err
+ );
}
}
@@ -228,7 +242,7 @@ pub(crate) async fn unordered_reliable_channel_task(
) {
tokio::select! {
_ = close_recv.recv() => {
- trace!("Ordered Reliable Channel task received a close signal")
+ trace!("Unordered Reliable Channel task received a close signal")
}
_ = channel_close_recv.recv() => {
trace!("Unordered Reliable Channel task received a channel close signal")
@@ -236,19 +250,19 @@ pub(crate) async fn unordered_reliable_channel_task(
_ = async {
while let Some(msg_bytes) = bytes_to_channel_recv.recv().await {
let conn = connection.clone();
- let to_sync_client_clone = from_channels_send.clone();
+ let from_channels_send_clone = from_channels_send.clone();
let channels_keepalive_clone = channel_tasks_keepalive.clone();
tokio::spawn(async move {
let mut frame_sender = new_uni_frame_sender(&conn).await;
if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- to_sync_client_clone.send(
+ error!("Error while sending on Unordered Reliable Channel, {}", err);
+ from_channels_send_clone.send(
ChannelAsyncMessage::LostConnection)
.await
- .expect("Failed to signal connection lost to sync client");
+ .expect("Failed to signal connection lost on Unordered Reliable Channel");
}
if let Err(err) = frame_sender.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
+ error!("Failed to shutdown Unordered Reliable Channel stream gracefully: {}", err);
}
drop(channels_keepalive_clone)
});
@@ -263,10 +277,16 @@ pub(crate) async fn unordered_reliable_channel_task(
tokio::spawn(async move {
let mut frame_sender = new_uni_frame_sender(&conn).await;
if let Err(err) = frame_sender.send(msg_bytes).await {
- error!("Error while sending, {}", err);
+ error!(
+ "Error while sending a remaining message on Unordered Reliable Channel, {}",
+ err
+ );
}
if let Err(err) = frame_sender.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
+ error!(
+ "Failed to shutdown Unordered Reliable Channel stream gracefully: {}",
+ err
+ );
}
drop(channels_keepalive_clone)
});
@@ -290,13 +310,20 @@ pub(crate) async fn unreliable_channel_task(
}
_ = async {
while let Some(msg_bytes) = bytes_to_channel_recv.recv().await {
- // TODO Test datagram_send_buffer_space + adapt to max_datagram_size
if let Err(err) = connection.send_datagram(msg_bytes) {
- error!("Error while sending, {}", err);
- from_channels_send.send(
- ChannelAsyncMessage::LostConnection)
- .await
- .expect("Failed to signal connection lost to sync client");
+ error!("Error while sending message on Unreliable Channel, {}", err);
+ match err {
+ SendDatagramError::UnsupportedByPeer => (),
+ SendDatagramError::Disabled => (),
+ SendDatagramError::TooLarge => (),
+ SendDatagramError::ConnectionLost(_) => {
+ from_channels_send.send(
+ ChannelAsyncMessage::LostConnection)
+ .await
+ .expect("Failed to signal connection lost from channels");
+ },
+ }
+
}
}
} => {
@@ -305,7 +332,10 @@ pub(crate) async fn unreliable_channel_task(
};
while let Ok(msg_bytes) = bytes_to_channel_recv.try_recv() {
if let Err(err) = connection.send_datagram(msg_bytes) {
- error!("Error while sending, {}", err);
+ error!(
+ "Error while sending a remaining message on Unreliable Channel, {}",
+ err
+ );
}
}
}