aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgilles henaux <gill.henaux@gmail.com>2023-01-14 16:04:41 +0100
committergilles henaux <gill.henaux@gmail.com>2023-01-14 16:04:41 +0100
commit4e64c1812b90d6cce07abb202439154fe9aa31a3 (patch)
tree53dc0cb099fdafa027ac38adce7cadd9c9be864c
parent30eecaebdd05ea1fa12b0807b87ce3dd8824a041 (diff)
[channels] Create a shared channels_task
-rw-r--r--src/shared/channel.rs69
1 files changed, 68 insertions, 1 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 211f5a5..ae4b517 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -2,7 +2,7 @@ use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
-use quinn::SendStream;
+use quinn::{SendStream, VarInt};
use std::fmt::Debug;
use tokio::sync::{
broadcast,
@@ -95,6 +95,73 @@ where
}
}
+pub(crate) async fn channels_task(
+ connection: quinn::Connection,
+ mut close_recv: broadcast::Receiver<()>,
+ mut to_channels_recv: mpsc::Receiver<ChannelSyncMessage>,
+ from_channels_send: mpsc::Sender<ChannelAsyncMessage>,
+) {
+ // Use an mpsc channel where, instead of sending messages, we wait for the channel to be closed, which happens when every sender has been dropped. We can't use a JoinSet as simply here since we would also need to drain closed channels from it.
+ let (channel_tasks_keepalive, mut channel_tasks_waiter) = mpsc::channel::<()>(1);
+
+ let close_receiver_clone = close_recv.resubscribe();
+ tokio::select! {
+ _ = close_recv.recv() => {
+ trace!("Connection Channels listener received a close signal")
+ }
+ _ = async {
+ while let Some(sync_message) = to_channels_recv.recv().await {
+ let ChannelSyncMessage::CreateChannel{ channel_id, bytes_to_channel_recv, channel_close_recv } = sync_message;
+
+ let close_receiver = close_receiver_clone.resubscribe();
+ let connection_handle = connection.clone();
+ let from_channels_send = from_channels_send.clone();
+ let channels_keepalive_clone = channel_tasks_keepalive.clone();
+
+ match channel_id {
+ ChannelId::OrderedReliable(_) => {
+ tokio::spawn(async move {
+ ordered_reliable_channel_task(
+ connection_handle,
+ channels_keepalive_clone,
+ from_channels_send,
+ close_receiver,
+ channel_close_recv,
+ bytes_to_channel_recv
+ )
+ .await
+ });
+ },
+ ChannelId::UnorderedReliable => {
+ tokio::spawn(async move {
+ unordered_reliable_channel_task(
+ connection_handle,
+ channels_keepalive_clone,
+ from_channels_send,
+ close_receiver,
+ channel_close_recv,
+ bytes_to_channel_recv
+ )
+ .await
+ });
+ },
+ ChannelId::Unreliable => todo!(),
+ }
+ }
+ } => {
+ trace!("Connection Channels listener ended")
+ }
+ };
+
+ // Wait for all the channels to have flushed/finished:
+ // We drop our sender first because the recv() call otherwise sleeps forever.
+ // When every sender has gone out of scope, the recv call will return with an error. We ignore the error.
+ drop(channel_tasks_keepalive);
+ let _ = channel_tasks_waiter.recv().await;
+
+ connection.close(VarInt::from_u32(0), "closed".as_bytes());
+}
+
pub(crate) async fn ordered_reliable_channel_task(
connection: quinn::Connection,
_: mpsc::Sender<()>,