aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.rs206
1 files changed, 137 insertions, 69 deletions
diff --git a/src/client.rs b/src/client.rs
index 1046b07..6be8c99 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -112,6 +112,14 @@ enum ConnectionState {
}
#[derive(Debug)]
+pub(crate) enum InternalSyncMessage {
+ CreateChannel {
+ channel_type: ChannelType,
+ to_server_receiver: mpsc::Receiver<Bytes>,
+ },
+}
+
+#[derive(Debug)]
pub(crate) enum InternalAsyncMessage {
Connected(InternalConnectionRef),
LostConnection,
@@ -131,10 +139,11 @@ pub(crate) enum InternalAsyncMessage {
pub(crate) struct ConnectionSpawnConfig {
connection_config: ConnectionConfiguration,
cert_mode: CertificateVerificationMode,
+ from_sync_client: mpsc::Receiver<InternalSyncMessage>,
to_sync_client: mpsc::Sender<InternalAsyncMessage>,
- close_sender: tokio::sync::broadcast::Sender<()>,
- close_receiver: tokio::sync::broadcast::Receiver<()>,
- to_server_receiver: mpsc::Receiver<Bytes>,
+ close_sender: broadcast::Sender<()>,
+ close_receiver: broadcast::Receiver<()>,
+ // to_server_receiver: mpsc::Receiver<Bytes>,
from_server_sender: mpsc::Sender<Bytes>,
}
@@ -163,6 +172,7 @@ pub struct Connection {
receiver: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>,
+ pub(crate) internal_sender: mpsc::Sender<InternalSyncMessage>,
}
impl Connection {
@@ -306,6 +316,7 @@ impl Connection {
}
}
+#[derive(Debug)]
pub enum ChannelType {
OrderedReliable,
UnorderedReliable,
@@ -380,11 +391,13 @@ impl Client {
) -> ConnectionId {
let (from_server_sender, from_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
- let (to_server_sender, to_server_receiver) =
- mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+ // let (to_server_sender, to_server_receiver) =
+ // mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
let (to_sync_client, from_async_client) =
mpsc::channel::<InternalAsyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
+ let (to_async_client, from_sync_client) =
+ mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
// Create a close channel for this connection
let (close_sender, close_receiver): (
@@ -398,6 +411,7 @@ impl Client {
receiver: from_server_receiver,
close_sender: close_sender.clone(),
internal_receiver: from_async_client,
+ internal_sender: to_async_client,
};
// Async connection
@@ -405,10 +419,11 @@ impl Client {
connection_task(ConnectionSpawnConfig {
connection_config: config,
cert_mode,
+ from_sync_client,
to_sync_client,
close_sender,
close_receiver,
- to_server_receiver,
+ // to_server_receiver,
from_server_sender,
})
.await
@@ -530,12 +545,6 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) {
.await
.expect("Failed to signal connection to sync client");
- let send = connection
- .open_uni()
- .await
- .expect("Failed to open send stream");
- let mut frame_send = FramedWrite::new(send, LengthDelimitedCodec::new());
-
let _close_waiter = {
let conn = connection.clone();
let to_sync_client = spawn_config.to_sync_client.clone();
@@ -551,70 +560,129 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) {
})
};
- let _network_sends = {
- let close_sender_clone = spawn_config.close_sender.clone();
- let conn = connection.clone();
- tokio::spawn(async move {
- tokio::select! {
- _ = spawn_config.close_receiver.recv() => {
- trace!("Unidirectional send Stream forced to disconnected")
- }
- _ = async {
- while let Some(msg_bytes) = spawn_config.to_server_receiver.recv().await {
- send_msg(&close_sender_clone, &spawn_config.to_sync_client, &mut frame_send, msg_bytes).await;
- }
- } => {
- trace!("Unidirectional send Stream ended")
- }
- }
- while let Ok(msg_bytes) = spawn_config.to_server_receiver.try_recv() {
- if let Err(err) = frame_send.send(msg_bytes).await {
- error!("Error while sending, {}", err);
- }
- }
- if let Err(err) = frame_send.flush().await {
- error!("Error while flushing stream: {}", err);
- }
- if let Err(err) = frame_send.into_inner().finish().await {
- error!("Failed to shutdown stream gracefully: {}", err);
- }
- conn.close(VarInt::from_u32(0), "closed".as_bytes());
- })
- };
+ let close_receiver = spawn_config.close_sender.subscribe();
+ let connection_handle = connection.clone();
+ tokio::spawn(async move {
+ connection_receiving_task(
+ connection_handle,
+ spawn_config.from_server_sender,
+ close_receiver,
+ )
+ .await
+ });
- let _network_reads = {
- let mut uni_receivers: JoinSet<()> = JoinSet::new();
- let mut close_receiver = spawn_config.close_sender.subscribe();
- tokio::spawn(async move {
- tokio::select! {
- _ = close_receiver.recv() => {
- trace!("New Stream listener forced to disconnected")
- }
- _ = async {
- while let Ok(recv)= connection.accept_uni().await {
- let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
- let from_server_sender = spawn_config.from_server_sender.clone();
-
- uni_receivers.spawn(async move {
- while let Some(Ok(msg_bytes)) = frame_recv.next().await {
- from_server_sender.send(msg_bytes.into()).await.unwrap(); // TODO Clean: error handling
- }
+ // Channels Sending tasks for this connection
+ let mut close_receiver = spawn_config.close_sender.subscribe();
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Connection Channels listener received a close signal")
+ }
+ _ = async {
+ while let Some(sync_message) = spawn_config
+ .from_sync_client.recv().await {
+ let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message;
+
+ let close_receiver = spawn_config.close_sender.subscribe();
+ let connection_handle = connection.clone();
+ let to_sync_client = spawn_config.to_sync_client.clone();
+ match channel_type {
+ ChannelType::OrderedReliable => {
+ tokio::spawn(async move {
+ ordered_reliable_channel_task(
+ connection_handle,
+ to_sync_client,
+ close_receiver,
+ to_server_receiver
+ )
+ .await
+ });
+ },
+ ChannelType::UnorderedReliable => {
+ tokio::spawn(async move {
+ todo!()
});
- }
- } => {
- trace!("New Stream listener ended ")
+ },
+ ChannelType::Unreliable => todo!(),
+ ChannelType::SequencedUnreliable => todo!(),
}
}
- uni_receivers.shutdown().await;
- trace!("All unidirectional stream receivers cleaned");
- })
+ } => {
+ trace!("Connection Channels listener ended")
+ }
};
}
}
}
+async fn connection_receiving_task(
+ connection: quinn::Connection,
+ from_server_sender: mpsc::Sender<Bytes>,
+ mut close_receiver: broadcast::Receiver<()>,
+) {
+ let mut uni_receivers: JoinSet<()> = JoinSet::new();
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Listener for new Unidirectional Receiving Streams received a close signal")
+ }
+ _ = async {
+ while let Ok(recv)= connection.accept_uni().await {
+ let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new());
+ let from_server_sender = from_server_sender.clone();
+ uni_receivers.spawn(async move {
+ while let Some(Ok(msg_bytes)) = frame_recv.next().await {
+ // TODO Clean: error handling
+ from_server_sender.send(msg_bytes.into()).await.unwrap();
+ }
+ });
+ }
+ } => {
+ trace!("Listener for new Unidirectional Receiving Streams ended")
+ }
+ };
+ uni_receivers.shutdown().await;
+ trace!("All unidirectional stream receivers cleaned");
+}
+
+async fn ordered_reliable_channel_task(
+ connection: quinn::Connection,
+ to_sync_client: mpsc::Sender<InternalAsyncMessage>,
+ mut close_receiver: broadcast::Receiver<()>,
+ mut to_server_receiver: mpsc::Receiver<Bytes>,
+) {
+ let uni_sender = connection
+ .open_uni()
+ .await
+ .expect("Failed to open send stream");
+ let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());
+ tokio::select! {
+ _ = close_receiver.recv() => {
+ trace!("Ordered Reliable Send Channel received a close signal")
+ }
+ _ = async {
+ while let Some(msg_bytes) = to_server_receiver.recv().await {
+ send_msg(&to_sync_client, &mut frame_sender, msg_bytes).await;
+ }
+ } => {
+ trace!("Ordered Reliable Send Channel ended")
+ }
+ }
+ while let Ok(msg_bytes) = to_server_receiver.try_recv() {
+ if let Err(err) = frame_sender.send(msg_bytes).await {
+ error!("Error while sending, {}", err);
+ }
+ }
+ if let Err(err) = frame_sender.flush().await {
+ error!("Error while flushing stream: {}", err);
+ }
+ if let Err(err) = frame_sender.into_inner().finish().await {
+ error!("Failed to shutdown stream gracefully: {}", err);
+ }
+ todo!("Do not close here, wait for all channels to be flushed");
+ connection.close(VarInt::from_u32(0), "closed".as_bytes());
+}
+
async fn send_msg(
- close_sender: &tokio::sync::broadcast::Sender<()>,
+ // close_sender: &tokio::sync::broadcast::Sender<()>,
to_sync_client: &mpsc::Sender<InternalAsyncMessage>,
frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>,
msg_bytes: Bytes,
@@ -628,9 +696,9 @@ async fn send_msg(
.send(InternalAsyncMessage::LostConnection)
.await
.expect("Failed to signal connection lost to sync client");
- if let Err(_) = close_sender.send(()) {
- error!("Failed to close all client streams & resources")
- }
+ // if let Err(_) = close_sender.send(()) {
+ // error!("Failed to close all client streams & resources")
+ // }
}
}