aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-05 19:02:18 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 14:47:28 +0100
commit14a26107031ddf10a86980e0f8ee588ac8b20d85 (patch)
treea95e451c18b48c8fcfa89869451a28498eb888a8 /src
parente17bb3c4c59aca95e4eb9db24003e20509352b93 (diff)
[client & shared] Rework channels and ChannelId
Diffstat (limited to 'src')
-rw-r--r--src/client.rs87
-rw-r--r--src/shared.rs5
-rw-r--r--src/shared/channel.rs128
3 files changed, 160 insertions, 60 deletions
diff --git a/src/client.rs b/src/client.rs
index f7de536..842db6b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -30,8 +30,8 @@ use tokio::{
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::shared::{
- AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE,
- DEFAULT_MESSAGE_QUEUE_SIZE,
+ channel::{Channel, ChannelId, OrdRelChannelId},
+ AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
use self::certificate::{
@@ -114,7 +114,7 @@ enum ConnectionState {
#[derive(Debug)]
pub(crate) enum InternalSyncMessage {
CreateChannel {
- channel_type: ChannelType,
+ channel_id: ChannelId,
to_server_receiver: mpsc::Receiver<Bytes>,
},
}
@@ -146,39 +146,13 @@ pub(crate) struct ConnectionSpawnConfig {
}
#[derive(Debug)]
-pub struct Channel {
- sender: mpsc::Sender<Bytes>,
- channel_type: ChannelType,
-}
-
-impl Channel {
- pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- match self.sender.try_send(payload.into()) {
- Ok(_) => Ok(()),
- Err(err) => match err {
- TrySendError::Full(_) => Err(QuinnetError::FullQueue),
- TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
- },
- }
- }
-
- pub fn channel_type(&self) -> ChannelType {
- self.channel_type
- }
-
- pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self {
- Self {
- sender,
- channel_type,
- }
- }
-}
-#[derive(Debug)]
pub struct Connection {
state: ConnectionState,
+ // channels: HashMap<ChannelId, Channel>,
channels: HashMap<ChannelId, Channel>,
- last_gen_id: ChannelId,
- default_channel_id: Option<ChannelId>,
+ default_channel: Option<ChannelId>,
+ last_gen_id: OrdRelChannelId,
+ // default_channel_id: Option<ChannelId>,
receiver: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>,
@@ -210,8 +184,8 @@ impl Connection {
}
pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> {
- match self.default_channel_id {
- Some(default_id) => self.send_message_on(default_id, message),
+ match self.default_channel {
+ Some(channel) => self.send_message_on(channel, message),
None => Err(QuinnetError::NoDefaultChannel),
}
}
@@ -242,8 +216,8 @@ impl Connection {
}
pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- match self.default_channel_id {
- Some(default_id) => self.send_payload_on(default_id, payload),
+ match self.default_channel {
+ Some(channel) => self.send_payload_on(channel, payload),
None => Err(QuinnetError::NoDefaultChannel),
}
}
@@ -323,23 +297,23 @@ impl Connection {
}
}
- /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately.
- pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> {
+ pub fn open_ordered_reliable_channel(&mut self) -> Result<ChannelId, QuinnetError> {
let (to_server_sender, to_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+
+ self.last_gen_id += 1;
+ let channel_id = ChannelId::OrderedReliable(self.last_gen_id);
match self
.internal_sender
.try_send(InternalSyncMessage::CreateChannel {
- channel_type,
+ channel_id,
to_server_receiver,
}) {
Ok(_) => {
- let channel = Channel::new(to_server_sender, channel_type);
- self.last_gen_id += 1;
- let channel_id = self.last_gen_id;
+ let channel = Channel::new(to_server_sender);
self.channels.insert(channel_id, channel);
- if self.default_channel_id.is_none() {
- self.default_channel_id = Some(channel_id);
+ if self.default_channel.is_none() {
+ self.default_channel = Some(channel_id);
}
Ok(channel_id)
@@ -352,13 +326,6 @@ impl Connection {
}
}
-#[derive(Debug, Copy, Clone)]
-pub enum ChannelType {
- OrderedReliable,
- UnorderedReliable,
- Unreliable,
-}
-
#[derive(Resource)]
pub struct Client {
runtime: runtime::Handle,
@@ -439,14 +406,16 @@ impl Client {
state: ConnectionState::Connecting,
channels: HashMap::new(),
last_gen_id: 0,
- default_channel_id: None,
+ default_channel: None,
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
internal_sender: to_async_client,
};
// Create a default channel
- connection.create_channel(ChannelType::OrderedReliable)?;
+ if let Err(err) = connection.open_ordered_reliable_channel() {
+ return Err(err);
+ }
// Async connection
self.runtime.spawn(async move {
@@ -657,13 +626,13 @@ async fn handle_connection_channels(
_ = async {
while let Some(sync_message) = from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message;
+ let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message;
let close_receiver = close_sender.subscribe();
let connection_handle = connection.clone();
let to_sync_client = to_sync_client.clone();
- match channel_type {
- ChannelType::OrderedReliable => {
+ match channel_id {
+ ChannelId::OrderedReliable(_) => {
tokio::spawn(async move {
ordered_reliable_channel_task(
connection_handle,
@@ -674,7 +643,7 @@ async fn handle_connection_channels(
.await
});
},
- ChannelType::UnorderedReliable => {
+ ChannelId::UnorderedReliable => {
tokio::spawn(async move {
// unordered_reliable_channel_task(
// connection_handle,
@@ -685,7 +654,7 @@ async fn handle_connection_channels(
// .await
});
},
- ChannelType::Unreliable => todo!(),
+ ChannelId::Unreliable => todo!(),
}
}
} => {
diff --git a/src/shared.rs b/src/shared.rs
index c4803a8..34215f5 100644
--- a/src/shared.rs
+++ b/src/shared.rs
@@ -5,12 +5,15 @@ use bevy::prelude::{Deref, DerefMut, Resource};
use rcgen::RcgenError;
use tokio::runtime::Runtime;
+use self::channel::ChannelId;
+
pub const DEFAULT_MESSAGE_QUEUE_SIZE: usize = 150;
pub const DEFAULT_KILL_MESSAGE_QUEUE_SIZE: usize = 10;
pub const DEFAULT_KEEP_ALIVE_INTERVAL_S: u64 = 4;
pub type ClientId = u64;
-pub type ChannelId = u64;
+
+pub mod channel;
#[derive(Resource, Deref, DerefMut)]
pub(crate) struct AsyncRuntime(pub(crate) Runtime);
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
new file mode 100644
index 0000000..0746c21
--- /dev/null
+++ b/src/shared/channel.rs
@@ -0,0 +1,128 @@
+use bevy::prelude::{error, trace};
+use bytes::Bytes;
+use futures::sink::SinkExt;
+use futures_util::StreamExt;
+use tokio::sync::{
+ broadcast,
+ mpsc::{self, error::TrySendError},
+};
+use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
+
+use super::QuinnetError;
+
+pub(crate) type OrdRelChannelId = u64;
+
+// #[derive(Debug, Copy, Clone)]
+// pub enum ChannelType {
+// OrderedReliable,
+// UnorderedReliable,
+// Unreliable,
+// }
+
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
+pub enum ChannelId {
+ OrderedReliable(OrdRelChannelId),
+ UnorderedReliable,
+ Unreliable,
+}
+
+impl std::fmt::Display for ChannelId {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{:?}", self)
+ }
+}
+
+#[derive(Debug)]
+pub struct Channel {
+ sender: mpsc::Sender<Bytes>,
+}
+
+impl Channel {
+ pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
+ match self.sender.try_send(payload.into()) {
+ Ok(_) => Ok(()),
+ Err(err) => match err {
+ TrySendError::Full(_) => Err(QuinnetError::FullQueue),
+ TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
+ },
+ }
+ }
+
+ pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self {
+ Self { sender }
+ }
+}
+
+// async fn ordered_reliable_channel_task(
+// connection: quinn::Connection,
+// to_sync_client: mpsc::Sender<InternalAsyncMessage>,
+// mut close_receiver: broadcast::Receiver<()>,
+// mut to_server_receiver: mpsc::Receiver<Bytes>,
+// ) {
+// tokio::select! {
+// _ = close_receiver.recv() => {
+// trace!("Ordered Reliable Send Channel received a close signal")
+// }
+// _ = async {
+// let uni_sender = connection
+// .open_uni()
+// .await
+// .expect("Failed to open send stream");
+// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+
+// while let Some(msg_bytes) = to_server_receiver.recv().await {
+// if let Err(err) = frame_sender.send(msg_bytes).await {
+// // TODO Clean: error handling
+// error!("Error while sending, {}", err);
+// // error!("Client seems disconnected, closing resources");
+// // if let Err(_) = close_sender_clone.send(()) {
+// // error!("Failed to close all client streams & resources")
+// // }
+// to_sync_client.send(
+// InternalAsyncMessage::LostConnection)
+// .await
+// .expect("Failed to signal connection lost to sync client");
+// }
+// }
+// } => {
+// trace!("Ordered Reliable Send Channel ended")
+// }
+// }
+// }
+
+// async fn unordered_reliable_channel_task(
+// connection: quinn::Connection,
+// to_sync_client: mpsc::Sender<InternalAsyncMessage>,
+// mut close_receiver: broadcast::Receiver<()>,
+// mut to_server_receiver: mpsc::Receiver<Bytes>,
+// ) {
+// tokio::select! {
+// _ = close_receiver.recv() => {
+// trace!("Unordered Reliable Send Channel received a close signal")
+// }
+// _ = async {
+// while let Some(msg_bytes) = to_server_receiver.recv().await {
+// let uni_sender = connection
+// .open_uni()
+// .await
+// .expect("Failed to open send stream");
+// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+
+// if let Err(err) = frame_sender.send(msg_bytes).await {
+// // TODO Clean: error handling
+// error!("Error while sending, {}", err);
+// // error!("Client seems disconnected, closing resources");
+// // if let Err(_) = close_sender_clone.send(()) {
+// // error!("Failed to close all client streams & resources")
+// // }
+// to_sync_client.send(
+// InternalAsyncMessage::LostConnection)
+// .await
+// .expect("Failed to signal connection lost to sync client");
+// }
+// }
+// } => {
+// trace!("Unordered Reliable Send Channel ended")
+// }
+// }
+// }