aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/client.rs66
1 files changed, 37 insertions, 29 deletions
diff --git a/src/client.rs b/src/client.rs
index fff1209..f7de536 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -178,8 +178,7 @@ pub struct Connection {
state: ConnectionState,
channels: HashMap<ChannelId, Channel>,
last_gen_id: ChannelId,
-
- // sender: mpsc::Sender<Bytes>,
+ default_channel_id: Option<ChannelId>,
receiver: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>,
@@ -211,14 +210,10 @@ impl Connection {
}
pub fn send_message<T: serde::Serialize>(&self, message: T) -> Result<(), QuinnetError> {
- // match &self.state {
- // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
- // _ => match bincode::serialize(&message) {
- // Ok(payload) => self.send_payload(payload),
- // Err(_) => Err(QuinnetError::Serialization),
- // },
- // }
- todo!()
+ match self.default_channel_id {
+ Some(default_id) => self.send_message_on(default_id, message),
+ None => Err(QuinnetError::NoDefaultChannel),
+ }
}
pub fn send_message_on<T: serde::Serialize>(
@@ -247,26 +242,32 @@ impl Connection {
}
pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
- // match &self.state {
- // ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
- // _ => match self.sender.try_send(payload.into()) {
- // Ok(_) => Ok(()),
- // Err(err) => match err {
- // TrySendError::Full(_) => Err(QuinnetError::FullQueue),
- // TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
- // },
- // },
- // }
- todo!()
+ match self.default_channel_id {
+ Some(default_id) => self.send_payload_on(default_id, payload),
+ None => Err(QuinnetError::NoDefaultChannel),
+ }
+ }
+
+ pub fn send_payload_on<T: Into<Bytes>>(
+ &self,
+ channel_id: ChannelId,
+ payload: T,
+ ) -> Result<(), QuinnetError> {
+ match &self.state {
+ ConnectionState::Disconnected => Err(QuinnetError::ConnectionClosed),
+ _ => match self.channels.get(&channel_id) {
+ Some(channel) => channel.send_payload(payload),
+ None => Err(QuinnetError::UnknownChannel(channel_id)),
+ },
+ }
}
/// Same as [Connection::send_payload] but will log the error instead of returning it
pub fn try_send_payload<T: Into<Bytes>>(&self, payload: T) {
- // match self.send_payload(payload) {
- // Ok(_) => {}
- // Err(err) => error!("try_send_payload: {}", err),
- // }
- todo!("Default channel")
+ match self.send_payload(payload) {
+ Ok(_) => {}
+ Err(err) => error!("try_send_payload: {}", err),
+ }
}
pub fn receive_payload(&mut self) -> Result<Option<Bytes>, QuinnetError> {
@@ -337,6 +338,10 @@ impl Connection {
self.last_gen_id += 1;
let channel_id = self.last_gen_id;
self.channels.insert(channel_id, channel);
+ if self.default_channel_id.is_none() {
+ self.default_channel_id = Some(channel_id);
+ }
+
Ok(channel_id)
}
Err(err) => match err {
@@ -418,7 +423,7 @@ impl Client {
&mut self,
config: ConnectionConfiguration,
cert_mode: CertificateVerificationMode,
- ) -> ConnectionId {
+ ) -> Result<ConnectionId, QuinnetError> {
let (from_server_sender, from_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
@@ -430,15 +435,18 @@ impl Client {
// Create a close channel for this connection
let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
- let connection = Connection {
+ let mut connection = Connection {
state: ConnectionState::Connecting,
channels: HashMap::new(),
last_gen_id: 0,
+ default_channel_id: None,
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
internal_sender: to_async_client,
};
+ // Create a default channel
+ connection.create_channel(ChannelType::OrderedReliable)?;
// Async connection
self.runtime.spawn(async move {
@@ -460,7 +468,7 @@ impl Client {
self.default_connection_id = Some(connection_id);
}
- connection_id
+ Ok(connection_id)
}
/// Set the default connection