aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-05 19:02:18 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 14:47:28 +0100
commit14a26107031ddf10a86980e0f8ee588ac8b20d85 (patch)
treea95e451c18b48c8fcfa89869451a28498eb888a8 /src/client.rs
parente17bb3c4c59aca95e4eb9db24003e20509352b93 (diff)
[client & shared] Rework channels and ChannelId
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs87
1 files changed, 28 insertions, 59 deletions
diff --git a/src/client.rs b/src/client.rs
index f7de536..842db6b 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -30,8 +30,8 @@ use tokio::{
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::shared::{
- AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE,
- DEFAULT_MESSAGE_QUEUE_SIZE,
+ channel::{Channel, ChannelId, OrdRelChannelId},
+ AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
use self::certificate::{
@@ -114,7 +114,7 @@ enum ConnectionState {
#[derive(Debug)]
pub(crate) enum InternalSyncMessage {
CreateChannel {
- channel_type: ChannelType,
+ channel_id: ChannelId,
to_server_receiver: mpsc::Receiver<Bytes>,
},
}
@@ -146,39 +146,13 @@ pub(crate) struct ConnectionSpawnConfig {
}
#[derive(Debug)]
-pub struct Channel {
- sender: mpsc::Sender<Bytes>,
- channel_type: ChannelType,
-}
-
-impl Channel {
- pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- match self.sender.try_send(payload.into()) {
- Ok(_) => Ok(()),
- Err(err) => match err {
- TrySendError::Full(_) => Err(QuinnetError::FullQueue),
- TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
- },
- }
- }
-
- pub fn channel_type(&self) -> ChannelType {
- self.channel_type
- }
-
- pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self {
- Self {
- sender,
- channel_type,
- }
- }
-}
-#[derive(Debug)]
pub struct Connection {
state: ConnectionState,
+ // channels: HashMap<ChannelId, Channel>,
channels: HashMap<ChannelId, Channel>,
- last_gen_id: ChannelId,
- default_channel_id: Option<ChannelId>,
+ default_channel: Option<ChannelId>,
+ last_gen_id: OrdRelChannelId,
+ // default_channel_id: Option<ChannelId>,
receiver: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>,
@@ -210,8 +184,8 @@ impl Connection {
}
pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> {
- match self.default_channel_id {
- Some(default_id) => self.send_message_on(default_id, message),
+ match self.default_channel {
+ Some(channel) => self.send_message_on(channel, message),
None => Err(QuinnetError::NoDefaultChannel),
}
}
@@ -242,8 +216,8 @@ impl Connection {
}
pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- match self.default_channel_id {
- Some(default_id) => self.send_payload_on(default_id, payload),
+ match self.default_channel {
+ Some(channel) => self.send_payload_on(channel, payload),
None => Err(QuinnetError::NoDefaultChannel),
}
}
@@ -323,23 +297,23 @@ impl Connection {
}
}
- /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately.
- pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> {
+ pub fn open_ordered_reliable_channel(&mut self) -> Result<ChannelId, QuinnetError> {
let (to_server_sender, to_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+
+ self.last_gen_id += 1;
+ let channel_id = ChannelId::OrderedReliable(self.last_gen_id);
match self
.internal_sender
.try_send(InternalSyncMessage::CreateChannel {
- channel_type,
+ channel_id,
to_server_receiver,
}) {
Ok(_) => {
- let channel = Channel::new(to_server_sender, channel_type);
- self.last_gen_id += 1;
- let channel_id = self.last_gen_id;
+ let channel = Channel::new(to_server_sender);
self.channels.insert(channel_id, channel);
- if self.default_channel_id.is_none() {
- self.default_channel_id = Some(channel_id);
+ if self.default_channel.is_none() {
+ self.default_channel = Some(channel_id);
}
Ok(channel_id)
@@ -352,13 +326,6 @@ impl Connection {
}
}
-#[derive(Debug, Copy, Clone)]
-pub enum ChannelType {
- OrderedReliable,
- UnorderedReliable,
- Unreliable,
-}
-
#[derive(Resource)]
pub struct Client {
runtime: runtime::Handle,
@@ -439,14 +406,16 @@ impl Client {
state: ConnectionState::Connecting,
channels: HashMap::new(),
last_gen_id: 0,
- default_channel_id: None,
+ default_channel: None,
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
internal_sender: to_async_client,
};
// Create a default channel
- connection.create_channel(ChannelType::OrderedReliable)?;
+ if let Err(err) = connection.open_ordered_reliable_channel() {
+ return Err(err);
+ }
// Async connection
self.runtime.spawn(async move {
@@ -657,13 +626,13 @@ async fn handle_connection_channels(
_ = async {
while let Some(sync_message) = from_sync_client.recv().await {
- let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message;
+ let InternalSyncMessage::CreateChannel{ channel_id, to_server_receiver } = sync_message;
let close_receiver = close_sender.subscribe();
let connection_handle = connection.clone();
let to_sync_client = to_sync_client.clone();
- match channel_type {
- ChannelType::OrderedReliable => {
+ match channel_id {
+ ChannelId::OrderedReliable(_) => {
tokio::spawn(async move {
ordered_reliable_channel_task(
connection_handle,
@@ -674,7 +643,7 @@ async fn handle_connection_channels(
.await
});
},
- ChannelType::UnorderedReliable => {
+ ChannelId::UnorderedReliable => {
tokio::spawn(async move {
// unordered_reliable_channel_task(
// connection_handle,
@@ -685,7 +654,7 @@ async fn handle_connection_channels(
// .await
});
},
- ChannelType::Unreliable => todo!(),
+ ChannelId::Unreliable => todo!(),
}
}
} => {