diff options
author | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-05 19:02:18 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 14:47:28 +0100 |
commit | 14a26107031ddf10a86980e0f8ee588ac8b20d85 (patch) | |
tree | a95e451c18b48c8fcfa89869451a28498eb888a8 /src/client.rs | |
parent | e17bb3c4c59aca95e4eb9db24003e20509352b93 (diff) |
[client & shared] Rework channels and ChannelId
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 87 |
1 files changed, 28 insertions, 59 deletions
diff --git a/src/client.rs b/src/client.rs index f7de536..842db6b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,8 +30,8 @@ use tokio::{ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use crate::shared::{ - AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, - DEFAULT_MESSAGE_QUEUE_SIZE, + channel::{Channel, ChannelId, OrdRelChannelId}, + AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, }; use self::certificate::{ @@ -114,7 +114,7 @@ enum ConnectionState { #[derive(Debug)] pub(crate) enum InternalSyncMessage { CreateChannel { - channel_type: ChannelType, + channel_id: ChannelId, to_server_receiver: mpsc::Receiver<Bytes>, }, } @@ -146,39 +146,13 @@ pub(crate) struct ConnectionSpawnConfig { } #[derive(Debug)] -pub struct Channel { - sender: mpsc::Sender<Bytes>, - channel_type: ChannelType, -} - -impl Channel { - pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - match self.sender.try_send(payload.into()) { - Ok(_) => Ok(()), - Err(err) => match err { - TrySendError::Full(_) => Err(QuinnetError::FullQueue), - TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), - }, - } - } - - pub fn channel_type(&self) -> ChannelType { - self.channel_type - } - - pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self { - Self { - sender, - channel_type, - } - } -} -#[derive(Debug)] pub struct Connection { state: ConnectionState, + // channels: HashMap<ChannelId, Channel>, channels: HashMap<ChannelId, Channel>, - last_gen_id: ChannelId, - default_channel_id: Option<ChannelId>, + default_channel: Option<ChannelId>, + last_gen_id: OrdRelChannelId, + // default_channel_id: Option<ChannelId>, receiver: mpsc::Receiver<Bytes>, close_sender: broadcast::Sender<()>, pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>, @@ -210,8 +184,8 @@ impl Connection { } pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> { - match self.default_channel_id { - Some(default_id) => self.send_message_on(default_id, message), + match self.default_channel { + Some(channel) => self.send_message_on(channel, message), None => Err(QuinnetError::NoDefaultChannel), } } @@ -242,8 +216,8 @@ impl Connection { } pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - match self.default_channel_id { - Some(default_id) => self.send_payload_on(default_id, payload), + match self.default_channel { + Some(channel) => self.send_payload_on(channel, payload), None => Err(QuinnetError::NoDefaultChannel), } } @@ -323,23 +297,23 @@ impl Connection { } } - /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately. - pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> { + pub fn open_ordered_reliable_channel(&mut self) -> Result<ChannelId, QuinnetError> { let (to_server_sender, to_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); + + self.last_gen_id += 1; + let channel_id = ChannelId::OrderedReliable(self.last_gen_id); match self .internal_sender .try_send(InternalSyncMessage::CreateChannel { - channel_type, + channel_id, to_server_receiver, }) { Ok(_) => { - let channel = Channel::new(to_server_sender, channel_type); - self.last_gen_id += 1; - let channel_id = self.last_gen_id; + let channel = Channel::new(to_server_sender); self.channels.insert(channel_id, channel); - if self.default_channel_id.is_none() { - self.default_channel_id = Some(channel_id); + if self.default_channel.is_none() { + self.default_channel = Some(channel_id); } Ok(channel_id) @@ -352,13 +326,6 @@ impl Connection { } } -#[derive(Debug, Copy, Clone)] -pub enum ChannelType { - OrderedReliable, - UnorderedReliable, - Unreliable, -} - #[derive(Resource)] pub struct Client { runtime: runtime::Handle, @@ -439,14 +406,16 @@ impl Client { state: ConnectionState::Connecting, channels: HashMap::new(), last_gen_id: 0, - default_channel_id: None, + default_channel: None, receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, internal_sender: to_async_client, }; // Create a default channel - connection.create_channel(ChannelType::OrderedReliable)?; + if let Err(err) = connection.open_ordered_reliable_channel() { + return Err(err); + } // Async connection self.runtime.spawn(async move { @@ -657,13 +626,13 @@ async fn handle_connection_channels( _ = async { while let Some(sync_message) = from_sync_client.recv().await { - let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message; + let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message; let close_receiver = close_sender.subscribe(); let connection_handle = connection.clone(); let to_sync_client = to_sync_client.clone(); - match channel_type { - ChannelType::OrderedReliable => { + match channel_id { + ChannelId::OrderedReliable(_) => { tokio::spawn(async move { ordered_reliable_channel_task( connection_handle, @@ -674,7 +643,7 @@ async fn handle_connection_channels( .await }); }, - ChannelType::UnorderedReliable => { + ChannelId::UnorderedReliable => { tokio::spawn(async move { // unordered_reliable_channel_task( // connection_handle, @@ -685,7 +654,7 @@ async fn handle_connection_channels( // .await }); }, - ChannelType::Unreliable => todo!(), + ChannelId::Unreliable => todo!(), } } } => { |