aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client.rs91
-rw-r--r--src/shared.rs3
2 files changed, 73 insertions, 21 deletions
diff --git a/src/client.rs b/src/client.rs
index 48255db..1046b07 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -30,7 +30,8 @@ use tokio::{
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::shared::{
- AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
+ AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE,
+ DEFAULT_MESSAGE_QUEUE_SIZE,
};
use self::certificate::{
@@ -138,10 +139,27 @@ pub(crate) struct ConnectionSpawnConfig {
}
#[derive(Debug)]
+pub struct Channel {
+ sender: mpsc::Sender<Bytes>,
+}
+
+impl Channel {
+ pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
+ match self.sender.try_send(payload.into()) {
+ Ok(_) => Ok(()),
+ Err(err) => match err {
+ TrySendError::Full(_) => Err(QuinnetError::FullQueue),
+ TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
+ },
+ }
+ }
+}
+#[derive(Debug)]
pub struct Connection {
state: ConnectionState,
- // TODO Perf: multiple channels
- sender: mpsc::Sender<Bytes>,
+ channels: HashMap<ChannelId, Channel>,
+
+ // sender: mpsc::Sender<Bytes>,
receiver: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>,
@@ -172,11 +190,29 @@ impl Connection {
}
pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> {
+ // match &self.state {
+ // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
+ // _ => match bincode::serialize(&message) {
+ // Ok(payload) => self.send_payload(payload),
+ // Err(_) => Err(QuinnetError::Serialization),
+ // },
+ // }
+ todo!()
+ }
+
+ pub fn send_message_on<T: serde::Serialize>(
+ &self,
+ channel_id: ChannelId,
+ message: T,
+ ) -> Result<(), QuinnetError> {
match &self.state {
ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
- _ => match bincode::serialize(&message) {
- Ok(payload) => self.send_payload(payload),
- Err(_) => Err(QuinnetError::Serialization),
+ _ => match self.channels.get(&channel_id) {
+ Some(channel) => match bincode::serialize(&message) {
+ Ok(payload) => channel.send_payload(payload),
+ Err(_) => Err(QuinnetError::Serialization),
+ },
+ None => Err(QuinnetError::UnknownChannel(channel_id)),
},
}
}
@@ -190,24 +226,26 @@ impl Connection {
}
pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- match &self.state {
- ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
- _ => match self.sender.try_send(payload.into()) {
- Ok(_) => Ok(()),
- Err(err) => match err {
- TrySendError::Full(_) => Err(QuinnetError::FullQueue),
- TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
- },
- },
- }
+ // match &self.state {
+ // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
+ // _ => match self.sender.try_send(payload.into()) {
+ // Ok(_) => Ok(()),
+ // Err(err) => match err {
+ // TrySendError::Full(_) => Err(QuinnetError::FullQueue),
+ // TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
+ // },
+ // },
+ // }
+ todo!()
}
/// Same as [Connection::send_payload] but will log the error instead of returning it
pub fn try_send_payload<T: Into<Bytes>>(&self, payload: T) {
- match self.send_payload(payload) {
- Ok(_) => {}
- Err(err) => error!("try_send_payload: {}", err),
- }
+ // match self.send_payload(payload) {
+ // Ok(_) => {}
+ // Err(err) => error!("try_send_payload: {}", err),
+ // }
+ todo!("Default channel")
}
pub fn receive_payload(&mut self) -> Result<Option<Bytes>, QuinnetError> {
@@ -262,6 +300,17 @@ impl Connection {
_ => None,
}
}
+
+ pub fn create_channel(&self, channel_type: ChannelType) -> ChannelId {
+ todo!("Gen id and channel")
+ }
+}
+
+pub enum ChannelType {
+ OrderedReliable,
+ UnorderedReliable,
+ Unreliable,
+ SequencedUnreliable,
}
#[derive(Resource)]
@@ -345,7 +394,7 @@ impl Client {
let connection = Connection {
state: ConnectionState::Connecting,
- sender: to_server_sender,
+ channels: HashMap::new(),
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
diff --git a/src/shared.rs b/src/shared.rs
index 608bb0c..4c5ae65 100644
--- a/src/shared.rs
+++ b/src/shared.rs
@@ -10,6 +10,7 @@ pub const DEFAULT_KILL_MESSAGE_QUEUE_SIZE: usize = 10;
pub const DEFAULT_KEEP_ALIVE_INTERVAL_S: u64 = 4;
pub type ClientId = u64;
+pub type ChannelId = u64;
#[derive(Resource, Deref, DerefMut)]
pub(crate) struct AsyncRuntime(pub(crate) Runtime);
@@ -27,6 +28,8 @@ pub enum QuinnetError {
UnknownConnection(ConnectionId),
#[error("Connection is closed")]
ConnectionClosed,
+ #[error("Channel with id `{0}` is unknown")]
+ UnknownChannel(ChannelId),
#[error("Endpoint is already closed")]
EndpointAlreadyClosed,
#[error("Failed serialization")]