diff options
author | gilles henaux <gill.henaux@gmail.com> | 2023-01-14 16:04:41 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-14 16:04:41 +0100 |
commit | 4e64c1812b90d6cce07abb202439154fe9aa31a3 (patch) | |
tree | 53dc0cb099fdafa027ac38adce7cadd9c9be864c | |
parent | 30eecaebdd05ea1fa12b0807b87ce3dd8824a041 (diff) |
[channels] Create a shared channels_task
-rw-r--r-- | src/shared/channel.rs | 69 |
1 files changed, 68 insertions, 1 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 211f5a5..ae4b517 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -2,7 +2,7 @@ use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; use futures::sink::SinkExt; -use quinn::SendStream; +use quinn::{SendStream, VarInt}; use std::fmt::Debug; use tokio::sync::{ broadcast, @@ -95,6 +95,73 @@ where } } +pub(crate) async fn channels_task( + connection: quinn::Connection, + mut close_recv: broadcast::Receiver<()>, + mut to_channels_recv: mpsc::Receiver<ChannelSyncMessage>, + from_channels_send: mpsc::Sender<ChannelAsyncMessage>, +) { + // Use an mpsc channel where, instead of sending messages, we wait for the channel to be closed, which happens when every sender has been dropped. We can't use a JoinSet as simply here since we would also need to drain closed channels from it. + let (channel_tasks_keepalive, mut channel_tasks_waiter) = mpsc::channel::<()>(1); + + let close_receiver_clone = close_recv.resubscribe(); + tokio::select! { + _ = close_recv.recv() => { + trace!("Connection Channels listener received a close signal") + } + _ = async { + while let Some(sync_message) = to_channels_recv.recv().await { + let ChannelSyncMessage::CreateChannel{ channel_id, bytes_to_channel_recv, channel_close_recv } = sync_message; + + let close_receiver = close_receiver_clone.resubscribe(); + let connection_handle = connection.clone(); + let from_channels_send = from_channels_send.clone(); + let channels_keepalive_clone = channel_tasks_keepalive.clone(); + + match channel_id { + ChannelId::OrderedReliable(_) => { + tokio::spawn(async move { + ordered_reliable_channel_task( + connection_handle, + channels_keepalive_clone, + from_channels_send, + close_receiver, + channel_close_recv, + bytes_to_channel_recv + ) + .await + }); + }, + ChannelId::UnorderedReliable => { + tokio::spawn(async move { + unordered_reliable_channel_task( + connection_handle, + channels_keepalive_clone, + from_channels_send, + close_receiver, + channel_close_recv, + bytes_to_channel_recv + ) + .await + }); + }, + ChannelId::Unreliable => todo!(), + } + } + } => { + trace!("Connection Channels listener ended") + } + }; + + // Wait for all the channels to have flushed/finished: + // We drop our sender first because the recv() call otherwise sleeps forever. + // When every sender has gone out of scope, the recv call will return with an error. We ignore the error. + drop(channel_tasks_keepalive); + let _ = channel_tasks_waiter.recv().await; + + connection.close(VarInt::from_u32(0), "closed".as_bytes()); +} + pub(crate) async fn ordered_reliable_channel_task( connection: quinn::Connection, _: mpsc::Sender<()>, |