diff options
-rw-r--r-- | src/client.rs | 66 |
1 files changed, 37 insertions, 29 deletions
diff --git a/src/client.rs b/src/client.rs index fff1209..f7de536 100644 --- a/src/client.rs +++ b/src/client.rs @@ -178,8 +178,7 @@ pub struct Connection { state: ConnectionState, channels: HashMap<ChannelId, Channel>, last_gen_id: ChannelId, - - // sender: mpsc::Sender<Bytes>, + default_channel_id: Option<ChannelId>, receiver: mpsc::Receiver<Bytes>, close_sender: broadcast::Sender<()>, pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>, @@ -211,14 +210,10 @@ impl Connection { } pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> { - // match &self.state { - // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), - // _ => match bincode::serialize(&message) { - // Ok(payload) => self.send_payload(payload), - // Err(_) => Err(QuinnetError::Serialization), - // }, - // } - todo!() + match self.default_channel_id { + Some(default_id) => self.send_message_on(default_id, message), + None => Err(QuinnetError::NoDefaultChannel), + } } pub fn send_message_on<T: serde::Serialize>( @@ -247,26 +242,32 @@ impl Connection { } pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> { - // match &self.state { - // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), - // _ => match self.sender.try_send(payload.into()) { - // Ok(_) => Ok(()), - // Err(err) => match err { - // TrySendError::Full(_) => Err(QuinnetError::FullQueue), - // TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), - // }, - // }, - // } - todo!() + match self.default_channel_id { + Some(default_id) => self.send_payload_on(default_id, payload), + None => Err(QuinnetError::NoDefaultChannel), + } + } + + pub fn send_payload_on<T: Into<Bytes>>( + &self, + channel_id: ChannelId, + payload: T, + ) -> Result<(), QuinnetError> { + match &self.state { + ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed), + _ => match self.channels.get(&channel_id) { + Some(channel) => channel.send_payload(payload), + None => Err(QuinnetError::UnknownChannel(channel_id)), + }, + } } /// Same as [Connection::send_payload] but will log the error instead of returning it pub fn try_send_payload<T: Into<Bytes>>(&self, payload: T) { - // match self.send_payload(payload) { - // Ok(_) => {} - // Err(err) => error!("try_send_payload: {}", err), - // } - todo!("Default channel") + match self.send_payload(payload) { + Ok(_) => {} + Err(err) => error!("try_send_payload: {}", err), + } } pub fn receive_payload(&mut self) -> Result<Option<Bytes>, QuinnetError> { @@ -337,6 +338,10 @@ impl Connection { self.last_gen_id += 1; let channel_id = self.last_gen_id; self.channels.insert(channel_id, channel); + if self.default_channel_id.is_none() { + self.default_channel_id = Some(channel_id); + } + Ok(channel_id) } Err(err) => match err { @@ -418,7 +423,7 @@ impl Client { &mut self, config: ConnectionConfiguration, cert_mode: CertificateVerificationMode, - ) -> ConnectionId { + ) -> Result<ConnectionId, QuinnetError> { let (from_server_sender, from_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); @@ -430,15 +435,18 @@ impl Client { // Create a close channel for this connection let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); - let connection = Connection { + let mut connection = Connection { state: ConnectionState::Connecting, channels: HashMap::new(), last_gen_id: 0, + default_channel_id: None, receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, internal_sender: to_async_client, }; + // Create a default channel + connection.create_channel(ChannelType::OrderedReliable)?; // Async connection self.runtime.spawn(async move { @@ -460,7 +468,7 @@ impl Client { self.default_connection_id = Some(connection_id); } - connection_id + Ok(connection_id) } /// Set the default connection |