diff options
author | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-05 19:02:18 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 14:47:28 +0100 |
commit | 14a26107031ddf10a86980e0f8ee588ac8b20d85 (patch) | |
tree | a95e451c18b48c8fcfa89869451a28498eb888a8 /src | |
parent | e17bb3c4c59aca95e4eb9db24003e20509352b93 (diff) |
[client & shared] Rework channels and ChannelId
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 87 | ||||
-rw-r--r-- | src/shared.rs | 5 | ||||
-rw-r--r-- | src/shared/channel.rs | 128 |
3 files changed, 160 insertions, 60 deletions
diff --git a/src/client.rs b/src/client.rs index f7de536..842db6b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,8 +30,8 @@ use tokio::{ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use crate::shared::{ - AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, - DEFAULT_MESSAGE_QUEUE_SIZE, + channel::{Channel, ChannelId, OrdRelChannelId}, + AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, }; use self::certificate::{ @@ -114,7 +114,7 @@ enum ConnectionState { #[derive(Debug)] pub(crate) enum InternalSyncMessage { CreateChannel { - channel_type: ChannelType, + channel_id: ChannelId, to_server_receiver: mpsc::Receiver<Bytes>, }, } @@ -146,39 +146,13 @@ pub(crate) struct ConnectionSpawnConfig { } #[derive(Debug)] -pub struct Channel { - sender: mpsc::Sender<Bytes>, - channel_type: ChannelType, -} - -impl Channel { - pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - match self.sender.try_send(payload.into()) { - Ok(_) => Ok(()), - Err(err) => match err { - TrySendError::Full(_) => Err(QuinnetError::FullQueue), - TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), - }, - } - } - - pub fn channel_type(&self) -> ChannelType { - self.channel_type - } - - pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self { - Self { - sender, - channel_type, - } - } -} -#[derive(Debug)] pub struct Connection { state: ConnectionState, + // channels: HashMap<ChannelId, Channel>, channels: HashMap<ChannelId, Channel>, - last_gen_id: ChannelId, - default_channel_id: Option<ChannelId>, + default_channel: Option<ChannelId>, + last_gen_id: OrdRelChannelId, + // default_channel_id: Option<ChannelId>, receiver: mpsc::Receiver<Bytes>, close_sender: broadcast::Sender<()>, pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>, @@ -210,8 +184,8 @@ impl Connection { } pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> { - match self.default_channel_id { - Some(default_id) => self.send_message_on(default_id, message), + match self.default_channel { + Some(channel) => self.send_message_on(channel, message), None => Err(QuinnetError::NoDefaultChannel), } } @@ -242,8 +216,8 @@ impl Connection { } pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - match self.default_channel_id { - Some(default_id) => self.send_payload_on(default_id, payload), + match self.default_channel { + Some(channel) => self.send_payload_on(channel, payload), None => Err(QuinnetError::NoDefaultChannel), } } @@ -323,23 +297,23 @@ impl Connection { } } - /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately. - pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> { + pub fn open_ordered_reliable_channel(&mut self) -> Result<ChannelId, QuinnetError> { let (to_server_sender, to_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); + + self.last_gen_id += 1; + let channel_id = ChannelId::OrderedReliable(self.last_gen_id); match self .internal_sender .try_send(InternalSyncMessage::CreateChannel { - channel_type, + channel_id, to_server_receiver, }) { Ok(_) => { - let channel = Channel::new(to_server_sender, channel_type); - self.last_gen_id += 1; - let channel_id = self.last_gen_id; + let channel = Channel::new(to_server_sender); self.channels.insert(channel_id, channel); - if self.default_channel_id.is_none() { - self.default_channel_id = Some(channel_id); + if self.default_channel.is_none() { + self.default_channel = Some(channel_id); } Ok(channel_id) @@ -352,13 +326,6 @@ impl Connection { } } -#[derive(Debug, Copy, Clone)] -pub enum ChannelType { - OrderedReliable, - UnorderedReliable, - Unreliable, -} - #[derive(Resource)] pub struct Client { runtime: runtime::Handle, @@ -439,14 +406,16 @@ impl Client { state: ConnectionState::Connecting, channels: HashMap::new(), last_gen_id: 0, - default_channel_id: None, + default_channel: None, receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, internal_sender: to_async_client, }; // Create a default channel - connection.create_channel(ChannelType::OrderedReliable)?; + if let Err(err) = connection.open_ordered_reliable_channel() { + return Err(err); + } // Async connection self.runtime.spawn(async move { @@ -657,13 +626,13 @@ async fn handle_connection_channels( _ = async { while let Some(sync_message) = from_sync_client.recv().await { - let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message; + let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message; let close_receiver = close_sender.subscribe(); let connection_handle = connection.clone(); let to_sync_client = to_sync_client.clone(); - match channel_type { - ChannelType::OrderedReliable => { + match channel_id { + ChannelId::OrderedReliable(_) => { tokio::spawn(async move { ordered_reliable_channel_task( connection_handle, @@ -674,7 +643,7 @@ async fn handle_connection_channels( .await }); }, - ChannelType::UnorderedReliable => { + ChannelId::UnorderedReliable => { tokio::spawn(async move { // unordered_reliable_channel_task( // connection_handle, @@ -685,7 +654,7 @@ async fn handle_connection_channels( // .await }); }, - ChannelType::Unreliable => todo!(), + ChannelId::Unreliable => todo!(), } } } => { diff --git a/src/shared.rs b/src/shared.rs index c4803a8..34215f5 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -5,12 +5,15 @@ use bevy::prelude::{Deref, DerefMut, Resource}; use rcgen::RcgenError; use tokio::runtime::Runtime; +use self::channel::ChannelId; + pub const DEFAULT_MESSAGE_QUEUE_SIZE: usize = 150; pub const DEFAULT_KILL_MESSAGE_QUEUE_SIZE: usize = 10; pub const DEFAULT_KEEP_ALIVE_INTERVAL_S: u64 = 4; pub type ClientId = u64; -pub type ChannelId = u64; + +pub mod channel; #[derive(Resource, Deref, DerefMut)] pub(crate) struct AsyncRuntime(pub(crate) Runtime); diff --git a/src/shared/channel.rs b/src/shared/channel.rs new file mode 100644 index 0000000..0746c21 --- /dev/null +++ b/src/shared/channel.rs @@ -0,0 +1,128 @@ +use bevy::prelude::{error, trace}; +use bytes::Bytes; +use futures::sink::SinkExt; +use futures_util::StreamExt; +use tokio::sync::{ + broadcast, + mpsc::{self, error::TrySendError}, +}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; + +use super::QuinnetError; + +pub(crate) type OrdRelChannelId = u64; + +// #[derive(Debug, Copy, Clone)] +// pub enum ChannelType { +// OrderedReliable, +// UnorderedReliable, +// Unreliable, +// } + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum ChannelId { + OrderedReliable(OrdRelChannelId), + UnorderedReliable, + Unreliable, +} + +impl std::fmt::Display for ChannelId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +#[derive(Debug)] +pub struct Channel { + sender: mpsc::Sender<Bytes>, +} + +impl Channel { + pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { + match self.sender.try_send(payload.into()) { + Ok(_) => Ok(()), + Err(err) => match err { + TrySendError::Full(_) => Err(QuinnetError::FullQueue), + TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), + }, + } + } + + pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self { + Self { sender } + } +} + +// async fn ordered_reliable_channel_task( +// connection: quinn::Connection, +// to_sync_client: mpsc::Sender<InternalAsyncMessage>, +// mut close_receiver: broadcast::Receiver<()>, +// mut to_server_receiver: mpsc::Receiver<Bytes>, +// ) { +// tokio::select! { +// _ = close_receiver.recv() => { +// trace!("Ordered Reliable Send Channel received a close signal") +// } +// _ = async { +// let uni_sender = connection +// .open_uni() +// .await +// .expect("Failed to open send stream"); +// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); + +// while let Some(msg_bytes) = to_server_receiver.recv().await { +// if let Err(err) = frame_sender.send(msg_bytes).await { +// // TODO Clean: error handling +// error!("Error while sending, {}", err); +// // error!("Client seems disconnected, closing resources"); +// // if let Err(_) = close_sender_clone.send(()) { +// // error!("Failed to close all client streams & resources") +// // } +// to_sync_client.send( +// InternalAsyncMessage::LostConnection) +// .await +// .expect("Failed to signal connection lost to sync client"); +// } +// } +// } => { +// trace!("Ordered Reliable Send Channel ended") +// } +// } +// } + +// async fn unordered_reliable_channel_task( +// connection: quinn::Connection, +// to_sync_client: mpsc::Sender<InternalAsyncMessage>, +// mut close_receiver: broadcast::Receiver<()>, +// mut to_server_receiver: mpsc::Receiver<Bytes>, +// ) { +// tokio::select! { +// _ = close_receiver.recv() => { +// trace!("Unordered Reliable Send Channel received a close signal") +// } +// _ = async { +// while let Some(msg_bytes) = to_server_receiver.recv().await { +// let uni_sender = connection +// .open_uni() +// .await +// .expect("Failed to open send stream"); +// let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); + +// if let Err(err) = frame_sender.send(msg_bytes).await { +// // TODO Clean: error handling +// error!("Error while sending, {}", err); +// // error!("Client seems disconnected, closing resources"); +// // if let Err(_) = close_sender_clone.send(()) { +// // error!("Failed to close all client streams & resources") +// // } +// to_sync_client.send( +// InternalAsyncMessage::LostConnection) +// .await +// .expect("Failed to signal connection lost to sync client"); +// } +// } +// } => { +// trace!("Unordered Reliable Send Channel ended") +// } +// } +// } |