diff options
author | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-04 17:51:20 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 14:38:55 +0100 |
commit | 6fed432b5ba751c4bd1630c9289d951a26fe403f (patch) | |
tree | d7441ef3046cfbf1a31dcf4faa285548a9ea1c5b /src/client.rs | |
parent | 0308782c5775baf0908b73e3e1a6b17c0a6a00a0 (diff) |
[client] Handle channel creation and remove Unreliable Sequenced channel type
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 53 |
1 files changed, 38 insertions, 15 deletions
diff --git a/src/client.rs b/src/client.rs index e852334..fff1209 100644 --- a/src/client.rs +++ b/src/client.rs @@ -142,14 +142,13 @@ pub(crate) struct ConnectionSpawnConfig { from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, close_sender: broadcast::Sender<()>, - close_receiver: broadcast::Receiver<()>, - // to_server_receiver: mpsc::Receiver<Bytes>, from_server_sender: mpsc::Sender<Bytes>, } #[derive(Debug)] pub struct Channel { sender: mpsc::Sender<Bytes>, + channel_type: ChannelType, } impl Channel { @@ -162,11 +161,23 @@ impl Channel { }, } } + + pub fn channel_type(&self) -> ChannelType { + self.channel_type + } + + pub(crate) fn new(sender: mpsc::Sender<Bytes>, channel_type: ChannelType) -> Self { + Self { + sender, + channel_type, + } + } } #[derive(Debug)] pub struct Connection { state: ConnectionState, channels: HashMap<ChannelId, Channel>, + last_gen_id: ChannelId, // sender: mpsc::Sender<Bytes>, receiver: mpsc::Receiver<Bytes>, @@ -311,17 +322,36 @@ impl Connection { } } - pub fn create_channel(&self, channel_type: ChannelType) -> ChannelId { - todo!("Gen id and channel") + /// Create a new channel of the given type, if the function returns Ok, the channel can be used immediately. + pub fn create_channel(&mut self, channel_type: ChannelType) -> Result<ChannelId, QuinnetError> { + let (to_server_sender, to_server_receiver) = + mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); + match self + .internal_sender + .try_send(InternalSyncMessage::CreateChannel { + channel_type, + to_server_receiver, + }) { + Ok(_) => { + let channel = Channel::new(to_server_sender, channel_type); + self.last_gen_id += 1; + let channel_id = self.last_gen_id; + self.channels.insert(channel_id, channel); + Ok(channel_id) + } + Err(err) => match err { + TrySendError::Full(_) => Err(QuinnetError::FullQueue), + TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed), + }, + } } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub enum ChannelType { OrderedReliable, UnorderedReliable, Unreliable, - SequencedUnreliable, } #[derive(Resource)] @@ -391,8 +421,6 @@ impl Client { ) -> ConnectionId { let (from_server_sender, from_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); - // let (to_server_sender, to_server_receiver) = - // mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); let (to_sync_client, from_async_client) = mpsc::channel::<InternalAsyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE); @@ -400,14 +428,12 @@ impl Client { mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE); // Create a close channel for this connection - let (close_sender, close_receiver): ( - tokio::sync::broadcast::Sender<()>, - tokio::sync::broadcast::Receiver<()>, - ) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); + let (close_sender, _) = broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE); let connection = Connection { state: ConnectionState::Connecting, channels: HashMap::new(), + last_gen_id: 0, receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, @@ -422,8 +448,6 @@ impl Client { from_sync_client, to_sync_client, close_sender, - close_receiver, - // to_server_receiver, from_server_sender, }) .await @@ -654,7 +678,6 @@ async fn handle_connection_channels( }); }, ChannelType::Unreliable => todo!(), - ChannelType::SequencedUnreliable => todo!(), } } } => { |