diff options
author | Henauxg <19689618+Henauxg@users.noreply.github.com> | 2023-01-04 17:09:13 +0100 |
---|---|---|
committer | gilles henaux <gill.henaux@gmail.com> | 2023-01-11 14:29:17 +0100 |
commit | ae6c55fc44145b2fe799d461ba22973785dec889 (patch) | |
tree | cc0990372a360a20babc3d235995ddafa9b9d780 /src | |
parent | f2a5c6b85556d4c5af057b9dd2fed445416d209b (diff) |
[client] Implement ordered reliable sender task
Diffstat (limited to 'src')
-rw-r--r-- | src/client.rs | 206 |
1 files changed, 137 insertions, 69 deletions
diff --git a/src/client.rs b/src/client.rs index 1046b07..6be8c99 100644 --- a/src/client.rs +++ b/src/client.rs @@ -112,6 +112,14 @@ enum ConnectionState { } #[derive(Debug)] +pub(crate) enum InternalSyncMessage { + CreateChannel { + channel_type: ChannelType, + to_server_receiver: mpsc::Receiver<Bytes>, + }, +} + +#[derive(Debug)] pub(crate) enum InternalAsyncMessage { Connected(InternalConnectionRef), LostConnection, @@ -131,10 +139,11 @@ pub(crate) enum InternalAsyncMessage { pub(crate) struct ConnectionSpawnConfig { connection_config: ConnectionConfiguration, cert_mode: CertificateVerificationMode, + from_sync_client: mpsc::Receiver<InternalSyncMessage>, to_sync_client: mpsc::Sender<InternalAsyncMessage>, - close_sender: tokio::sync::broadcast::Sender<()>, - close_receiver: tokio::sync::broadcast::Receiver<()>, - to_server_receiver: mpsc::Receiver<Bytes>, + close_sender: broadcast::Sender<()>, + close_receiver: broadcast::Receiver<()>, + // to_server_receiver: mpsc::Receiver<Bytes>, from_server_sender: mpsc::Sender<Bytes>, } @@ -163,6 +172,7 @@ pub struct Connection { receiver: mpsc::Receiver<Bytes>, close_sender: broadcast::Sender<()>, pub(crate) internal_receiver: mpsc::Receiver<InternalAsyncMessage>, + pub(crate) internal_sender: mpsc::Sender<InternalSyncMessage>, } impl Connection { @@ -306,6 +316,7 @@ impl Connection { } } +#[derive(Debug)] pub enum ChannelType { OrderedReliable, UnorderedReliable, @@ -380,11 +391,13 @@ impl Client { ) -> ConnectionId { let (from_server_sender, from_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); - let (to_server_sender, to_server_receiver) = - mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); + // let (to_server_sender, to_server_receiver) = + // mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE); let (to_sync_client, from_async_client) = mpsc::channel::<InternalAsyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE); + let (to_async_client, from_sync_client) = + mpsc::channel::<InternalSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE); // Create a close channel for this connection let (close_sender, close_receiver): ( @@ -398,6 +411,7 @@ impl Client { receiver: from_server_receiver, close_sender: close_sender.clone(), internal_receiver: from_async_client, + internal_sender: to_async_client, }; // Async connection @@ -405,10 +419,11 @@ impl Client { connection_task(ConnectionSpawnConfig { connection_config: config, cert_mode, + from_sync_client, to_sync_client, close_sender, close_receiver, - to_server_receiver, + // to_server_receiver, from_server_sender, }) .await @@ -530,12 +545,6 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) { .await .expect("Failed to signal connection to sync client"); - let send = connection - .open_uni() - .await - .expect("Failed to open send stream"); - let mut frame_send = FramedWrite::new(send, LengthDelimitedCodec::new()); - let _close_waiter = { let conn = connection.clone(); let to_sync_client = spawn_config.to_sync_client.clone(); @@ -551,70 +560,129 @@ async fn connection_task(mut spawn_config: ConnectionSpawnConfig) { }) }; - let _network_sends = { - let close_sender_clone = spawn_config.close_sender.clone(); - let conn = connection.clone(); - tokio::spawn(async move { - tokio::select! { - _ = spawn_config.close_receiver.recv() => { - trace!("Unidirectional send Stream forced to disconnected") - } - _ = async { - while let Some(msg_bytes) = spawn_config.to_server_receiver.recv().await { - send_msg(&close_sender_clone, &spawn_config.to_sync_client, &mut frame_send, msg_bytes).await; - } - } => { - trace!("Unidirectional send Stream ended") - } - } - while let Ok(msg_bytes) = spawn_config.to_server_receiver.try_recv() { - if let Err(err) = frame_send.send(msg_bytes).await { - error!("Error while sending, {}", err); - } - } - if let Err(err) = frame_send.flush().await { - error!("Error while flushing stream: {}", err); - } - if let Err(err) = frame_send.into_inner().finish().await { - error!("Failed to shutdown stream gracefully: {}", err); - } - conn.close(VarInt::from_u32(0), "closed".as_bytes()); - }) - }; + let close_receiver = spawn_config.close_sender.subscribe(); + let connection_handle = connection.clone(); + tokio::spawn(async move { + connection_receiving_task( + connection_handle, + spawn_config.from_server_sender, + close_receiver, + ) + .await + }); - let _network_reads = { - let mut uni_receivers: JoinSet<()> = JoinSet::new(); - let mut close_receiver = spawn_config.close_sender.subscribe(); - tokio::spawn(async move { - tokio::select! { - _ = close_receiver.recv() => { - trace!("New Stream listener forced to disconnected") - } - _ = async { - while let Ok(recv)= connection.accept_uni().await { - let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new()); - let from_server_sender = spawn_config.from_server_sender.clone(); - - uni_receivers.spawn(async move { - while let Some(Ok(msg_bytes)) = frame_recv.next().await { - from_server_sender.send(msg_bytes.into()).await.unwrap(); // TODO Clean: error handling - } + // Channels Sending tasks for this connection + let mut close_receiver = spawn_config.close_sender.subscribe(); + tokio::select! { + _ = close_receiver.recv() => { + trace!("Connection Channels listener received a close signal") + } + _ = async { + while let Some(sync_message) = spawn_config + .from_sync_client.recv().await { + let InternalSyncMessage::CreateChannel{ channel_type, to_server_receiver } = sync_message; + + let close_receiver = spawn_config.close_sender.subscribe(); + let connection_handle = connection.clone(); + let to_sync_client = spawn_config.to_sync_client.clone(); + match channel_type { + ChannelType::OrderedReliable => { + tokio::spawn(async move { + ordered_reliable_channel_task( + connection_handle, + to_sync_client, + close_receiver, + to_server_receiver + ) + .await + }); + }, + ChannelType::UnorderedReliable => { + tokio::spawn(async move { + todo!() }); - } - } => { - trace!("New Stream listener ended ") + }, + ChannelType::Unreliable => todo!(), + ChannelType::SequencedUnreliable => todo!(), } } - uni_receivers.shutdown().await; - trace!("All unidirectional stream receivers cleaned"); - }) + } => { + trace!("Connection Channels listener ended") + } }; } } } +async fn connection_receiving_task( + connection: quinn::Connection, + from_server_sender: mpsc::Sender<Bytes>, + mut close_receiver: broadcast::Receiver<()>, +) { + let mut uni_receivers: JoinSet<()> = JoinSet::new(); + tokio::select! { + _ = close_receiver.recv() => { + trace!("Listener for new Unidirectional Receiving Streams received a close signal") + } + _ = async { + while let Ok(recv)= connection.accept_uni().await { + let mut frame_recv = FramedRead::new(recv, LengthDelimitedCodec::new()); + let from_server_sender = from_server_sender.clone(); + uni_receivers.spawn(async move { + while let Some(Ok(msg_bytes)) = frame_recv.next().await { + // TODO Clean: error handling + from_server_sender.send(msg_bytes.into()).await.unwrap(); + } + }); + } + } => { + trace!("Listener for new Unidirectional Receiving Streams ended") + } + }; + uni_receivers.shutdown().await; + trace!("All unidirectional stream receivers cleaned"); +} + +async fn ordered_reliable_channel_task( + connection: quinn::Connection, + to_sync_client: mpsc::Sender<InternalAsyncMessage>, + mut close_receiver: broadcast::Receiver<()>, + mut to_server_receiver: mpsc::Receiver<Bytes>, +) { + let uni_sender = connection + .open_uni() + .await + .expect("Failed to open send stream"); + let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new()); + tokio::select! { + _ = close_receiver.recv() => { + trace!("Ordered Reliable Send Channel received a close signal") + } + _ = async { + while let Some(msg_bytes) = to_server_receiver.recv().await { + send_msg(&to_sync_client, &mut frame_sender, msg_bytes).await; + } + } => { + trace!("Ordered Reliable Send Channel ended") + } + } + while let Ok(msg_bytes) = to_server_receiver.try_recv() { + if let Err(err) = frame_sender.send(msg_bytes).await { + error!("Error while sending, {}", err); + } + } + if let Err(err) = frame_sender.flush().await { + error!("Error while flushing stream: {}", err); + } + if let Err(err) = frame_sender.into_inner().finish().await { + error!("Failed to shutdown stream gracefully: {}", err); + } + todo!("Do not close here, wait for all channels to be flushed"); + connection.close(VarInt::from_u32(0), "closed".as_bytes()); +} + async fn send_msg( - close_sender: &tokio::sync::broadcast::Sender<()>, + // close_sender: &tokio::sync::broadcast::Sender<()>, to_sync_client: &mpsc::Sender<InternalAsyncMessage>, frame_send: &mut FramedWrite<SendStream, LengthDelimitedCodec>, msg_bytes: Bytes, @@ -628,9 +696,9 @@ async fn send_msg( .send(InternalAsyncMessage::LostConnection) .await .expect("Failed to signal connection lost to sync client"); - if let Err(_) = close_sender.send(()) { - error!("Failed to close all client streams & resources") - } + // if let Err(_) = close_sender.send(()) { + // error!("Failed to close all client streams & resources") + // } } } |