aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/shared/channel.rs75
1 files changed, 71 insertions, 4 deletions
diff --git a/src/shared/channel.rs b/src/shared/channel.rs
index 7b62d17..192e495 100644
--- a/src/shared/channel.rs
+++ b/src/shared/channel.rs
@@ -1,14 +1,14 @@
use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
-use futures::sink::SinkExt;
-use quinn::{SendDatagramError, SendStream, VarInt};
-use std::fmt::Debug;
+use futures::{sink::SinkExt, StreamExt};
+use quinn::{RecvStream, SendDatagramError, SendStream, VarInt};
+use std::fmt::{Debug, Display};
use tokio::sync::{
broadcast,
mpsc::{self, error::TrySendError},
};
-use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
+use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
@@ -358,3 +358,70 @@ async fn new_uni_frame_sender(
.expect("Failed to open send stream");
FramedWrite::new(uni_sender, LengthDelimitedCodec::new())
}
+
+async fn uni_receiver_task(
+ recv: RecvStream,
+ mut close_recv: broadcast::Receiver<()>,
+ bytes_from_server_send: mpsc::Sender<Bytes>,
+) {
+ tokio::select! {
+ _ = close_recv.recv() => {}
+ _ = async {
+ let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
+ while let Some(Ok(msg_bytes)) = frame_recv.next().await {
+ // TODO Clean: error handling
+ bytes_from_server_send.send(msg_bytes.into()).await.unwrap();
+ }
+ } => {}
+ };
+}
+
+pub(crate) async fn reliable_receiver_task<T: Display>(
+ id: T,
+ connection: quinn::Connection,
+ mut close_recv: broadcast::Receiver<()>,
+ bytes_incoming_send: mpsc::Sender<Bytes>,
+) {
+ let close_recv_clone = close_recv.resubscribe();
+ tokio::select! {
+ _ = close_recv.recv() => {
+ trace!("Listener for new Unidirectional Receiving Streams with id {} received a close signal", id)
+ }
+ _ = async {
+ while let Ok(recv) = connection.accept_uni().await {
+ let bytes_from_server_send = bytes_incoming_send.clone();
+ let close_recv_clone = close_recv_clone.resubscribe();
+ tokio::spawn(async move {
+ uni_receiver_task(
+ recv,
+ close_recv_clone,
+ bytes_from_server_send
+ ).await;
+ });
+ }
+ } => {
+ trace!("Listener for new Unidirectional Receiving Streams with id {} ended", id)
+ }
+ };
+}
+
+pub(crate) async fn unreliable_receiver_task<T: Display>(
+ id: T,
+ connection: quinn::Connection,
+ mut close_recv: broadcast::Receiver<()>,
+ bytes_incoming_send: mpsc::Sender<Bytes>,
+) {
+ tokio::select! {
+ _ = close_recv.recv() => {
+ trace!("Listener for unreliable datagrams with id {} received a close signal", id)
+ }
+ _ = async {
+ while let Ok(msg_bytes) = connection.read_datagram().await {
+ // TODO Clean: error handling
+ bytes_incoming_send.send(msg_bytes.into()).await.unwrap();
+ }
+ } => {
+ trace!("Listener for unreliable datagrams with id {} ended", id)
+ }
+ };
+}