aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-04 17:51:20 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-11 14:38:55 +0100
commit6fed432b5ba751c4bd1630c9289d951a26fe403f (patch)
treed7441ef3046cfbf1a31dcf4faa285548a9ea1c5b /src/client.rs
parent0308782c5775baf0908b73e3e1a6b17c0a6a00a0 (diff)
[client] Handle channel creation and remove Unreliable Sequenced channel type
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs53
1 files changed, 38 insertions, 15 deletions
diff --git a/src/client.rs b/src/client.rs
index e852334..fff1209 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -142,14 +142,13 @@ pub(crate) struct ConnectionSpawnConfig {
from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
close_sender: broadcast::Sender<()>,
- close_receiver: broadcast::Receiver<()>,
- // to_server_receiver: mpsc::Receiver<Bytes>,
from_server_sender: mpsc::Sender<Bytes>,
}
#[derive(Debug)]
pub struct Channel {
sender: mpsc::Sender<Bytes>,
+ channel_type: ChannelType,
}
impl Channel {
@@ -162,11 +161,23 @@ impl Channel {
},
}
}
+
+ pub fn channel_type(&self) -> ChannelType {
+ self.channel_type
+ }
+
+ pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self {
+ Self {
+ sender,
+ channel_type,
+ }
+ }
}
#[derive(Debug)]
pub struct Connection {
state: ConnectionState,
channels: HashMap<ChannelId, Channel>,
+ last_gen_id: ChannelId,
// sender: mpsc::Sender<Bytes>,
receiver: mpsc::Receiver<Bytes>,
@@ -311,17 +322,36 @@ impl Connection {
}
}
- pub fn create_channel(&self, channel_type: ChannelType) -> ChannelId {
- todo!("Gen id and channel")
+ /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately.
+ pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> {
+ let (to_server_sender, to_server_receiver) =
+ mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+ match self
+ .internal_sender
+ .try_send(InternalSyncMessage::CreateChannel {
+ channel_type,
+ to_server_receiver,
+ }) {
+ Ok(_) => {
+ let channel = Channel::new(to_server_sender, channel_type);
+ self.last_gen_id += 1;
+ let channel_id = self.last_gen_id;
+ self.channels.insert(channel_id, channel);
+ Ok(channel_id)
+ }
+ Err(err) => match err {
+ TrySendError::Full(_) => Err(QuinnetError::FullQueue),
+ TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
+ },
+ }
}
}
-#[derive(Debug)]
+#[derive(Debug, Copy, Clone)]
pub enum ChannelType {
OrderedReliable,
UnorderedReliable,
Unreliable,
- SequencedUnreliable,
}
#[derive(Resource)]
@@ -391,8 +421,6 @@ impl Client {
) -> ConnectionId {
let (from_server_sender, from_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
- // let (to_server_sender, to_server_receiver) =
- // mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
let (to_sync_client, from_async_client) =
mpsc::channel::<InternalAsyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
@@ -400,14 +428,12 @@ impl Client {
mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
// Create a close channel for this connection
- let (close_sender, close_receiver): (
- tokio::sync::broadcast::Sender<()>,
- tokio::sync::broadcast::Receiver<()>,
- ) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
+ let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
let connection = Connection {
state: ConnectionState::Connecting,
channels: HashMap::new(),
+ last_gen_id: 0,
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
@@ -422,8 +448,6 @@ impl Client {
from_sync_client,
to_sync_client,
close_sender,
- close_receiver,
- // to_server_receiver,
from_server_sender,
})
.await
@@ -654,7 +678,6 @@ async fn handle_connection_channels(
});
},
ChannelType::Unreliable => todo!(),
- ChannelType::SequencedUnreliable => todo!(),
}
}
} => {