diff options
-rw-r--r-- | src/client.rs | 91 | ||||
-rw-r--r-- | src/shared.rs | 3 |
2 files changed, 73 insertions, 21 deletions
diff --git a/src/client.rs b/src/client.rs index 48255db..1046b07 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,7 +30,8 @@ use tokio::{ use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use crate::shared::{ - AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE, + AsyncRuntime, ChannelId, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, + DEFAULT_MESSAGE_QUEUE_SIZE, }; use self::certificate::{ @@ -138,10 +139,27 @@ pub(crate) struct ConnectionSpawnConfig { } #[derive(Debug)] +pub struct Channel { + sender: mpsc::Sender<Bytes>, +} + +impl Channel { + pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { + match self.sender.try_send(payload.into()) { + Ok(_) => Ok(()), + Err(err) => match err { + TrySendError::Full(_) => Err(QuinnetError::FullQueue), + TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), + }, + } + } +} +#[derive(Debug)] pub struct Connection { state: ConnectionState, - // TODO Perf: multiple channels - sender: mpsc::Sender<Bytes>, + channels: HashMap<ChannelId, Channel>, + + // sender: mpsc::Sender<Bytes>, receiver: mpsc::Receiver<Bytes>, close_sender: broadcast::Sender<()>, pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>, @@ -172,11 +190,29 @@ impl Connection { } pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> { + // match &self.state { + // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), + // _ => match bincode::serialize(&message) { + // Ok(payload) => self.send_payload(payload), + // Err(_) => Err(QuinnetError::Serialization), + // }, + // } + todo!() + } + + pub fn send_message_on<T: serde::Serialize>( + &self, + channel_id: ChannelId, + message: T, + ) -> Result<(), QuinnetError> { match &self.state { ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), - _ => match bincode::serialize(&message) { - Ok(payload) => self.send_payload(payload), - Err(_) => Err(QuinnetError::Serialization), + _ => match self.channels.get(&channel_id) { + Some(channel) => match bincode::serialize(&message) { + Ok(payload) => channel.send_payload(payload), + Err(_) => Err(QuinnetError::Serialization), + }, + None => Err(QuinnetError::UnknownChannel(channel_id)), }, } } @@ -190,24 +226,26 @@ impl Connection { } pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - match &self.state { - ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), - _ => match self.sender.try_send(payload.into()) { - Ok(_) => Ok(()), - Err(err) => match err { - TrySendError::Full(_) => Err(QuinnetError::FullQueue), - TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), - }, - }, - } + // match &self.state { + // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), + // _ => match self.sender.try_send(payload.into()) { + // Ok(_) => Ok(()), + // Err(err) => match err { + // TrySendError::Full(_) => Err(QuinnetError::FullQueue), + // TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), + // }, + // }, + // } + todo!() } /// Same as [Connection::send_payload] but will log the error instead of returning it pub fn try_send_payload<T: Into<Bytes>>(&self, payload: T) { - match self.send_payload(payload) { - Ok(_) => {} - Err(err) => error!("try_send_payload: {}", err), - } + // match self.send_payload(payload) { + // Ok(_) => {} + // Err(err) => error!("try_send_payload: {}", err), + // } + todo!("Default channel") } pub fn receive_payload(&mut self) -> Result<Option<Bytes>, QuinnetError> { @@ -262,6 +300,17 @@ impl Connection { _ => None, } } + + pub fn create_channel(&self, channel_type: ChannelType) -> ChannelId { + todo!("Gen id and channel") + } +} + +pub enum ChannelType { + OrderedReliable, + UnorderedReliable, + Unreliable, + SequencedUnreliable, } #[derive(Resource)] @@ -345,7 +394,7 @@ impl Client { let connection = Connection { state: ConnectionState::Connecting, - sender: to_server_sender, + channels: HashMap::new(), receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, diff --git a/src/shared.rs b/src/shared.rs index 608bb0c..4c5ae65 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -10,6 +10,7 @@ pub const DEFAULT_KILL_MESSAGE_QUEUE_SIZE: usize = 10; pub const DEFAULT_KEEP_ALIVE_INTERVAL_S: u64 = 4; pub type ClientId = u64; +pub type ChannelId = u64; #[derive(Resource, Deref, DerefMut)] pub(crate) struct AsyncRuntime(pub(crate) Runtime); @@ -27,6 +28,8 @@ pub enum QuinnetError { UnknownConnection(ConnectionId), #[error("Connection is closed")] ConnectionClosed, + #[error("Channel with id `{0}` is unknown")] + UnknownChannel(ChannelId), #[error("Endpoint is already closed")] EndpointAlreadyClosed, #[error("Failed serialization")] |