diff options
-rw-r--r-- | src/shared/channel.rs | 75 |
1 files changed, 71 insertions, 4 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs index 7b62d17..192e495 100644 --- a/src/shared/channel.rs +++ b/src/shared/channel.rs @@ -1,14 +1,14 @@ use super::QuinnetError; use bevy::prelude::{error, trace}; use bytes::Bytes; -use futures::sink::SinkExt; -use quinn::{SendDatagramError, SendStream, VarInt}; -use std::fmt::Debug; +use futures::{sink::SinkExt, StreamExt}; +use quinn::{RecvStream, SendDatagramError, SendStream, VarInt}; +use std::fmt::{Debug, Display}; use tokio::sync::{ broadcast, mpsc::{self, error::TrySendError}, }; -use tokio_util::codec::{FramedWrite, LengthDelimitedCodec}; +use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; pub(crate) type MultiChannelId = u64; @@ -358,3 +358,70 @@ async fn new_uni_frame_sender( .expect("Failed to open send stream"); FramedWrite::new(uni_sender, LengthDelimitedCodec::new()) } + +async fn uni_receiver_task( + recv: RecvStream, + mut close_recv: broadcast::Receiver<()>, + bytes_from_server_send: mpsc::Sender<Bytes>, +) { + tokio::select! { + _ = close_recv.recv() => {} + _ = async { + let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new()); + while let Some(Ok(msg_bytes)) = frame_recv.next().await { + // TODO Clean: error handling + bytes_from_server_send.send(msg_bytes.into()).await.unwrap(); + } + } => {} + }; +} + +pub(crate) async fn reliable_receiver_task<T: Display>( + id: T, + connection: quinn::Connection, + mut close_recv: broadcast::Receiver<()>, + bytes_incoming_send: mpsc::Sender<Bytes>, +) { + let close_recv_clone = close_recv.resubscribe(); + tokio::select! { + _ = close_recv.recv() => { + trace!("Listener for new Unidirectional Receiving Streams with id {} received a close signal", id) + } + _ = async { + while let Ok(recv) = connection.accept_uni().await { + let bytes_from_server_send = bytes_incoming_send.clone(); + let close_recv_clone = close_recv_clone.resubscribe(); + tokio::spawn(async move { + uni_receiver_task( + recv, + close_recv_clone, + bytes_from_server_send + ).await; + }); + } + } => { + trace!("Listener for new Unidirectional Receiving Streams with id {} ended", id) + } + }; +} + +pub(crate) async fn unreliable_receiver_task<T: Display>( + id: T, + connection: quinn::Connection, + mut close_recv: broadcast::Receiver<()>, + bytes_incoming_send: mpsc::Sender<Bytes>, +) { + tokio::select! { + _ = close_recv.recv() => { + trace!("Listener for unreliable datagrams with id {} received a close signal", id) + } + _ = async { + while let Ok(msg_bytes) = connection.read_datagram().await { + // TODO Clean: error handling + bytes_incoming_send.send(msg_bytes.into()).await.unwrap(); + } + } => { + trace!("Listener for unreliable datagrams with id {} ended", id) + } + }; +} |