aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-18 17:52:40 +0100
committerHenauxg <19689618+Henauxg@users.noreply.github.com>2023-01-18 17:52:40 +0100
commitb17b07d212637b2a858113d421097369c25a44c5 (patch)
tree79fbbd9d4dfc3e7982f62ae85d929b73c7be5b6e /src/server.rs
parent98ac9e3e6467df01aeb32fc0ef439689ef850e54 (diff)
[server] Generate client ids on the sync server instead of async
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs234
1 files changed, 137 insertions, 97 deletions
diff --git a/src/server.rs b/src/server.rs
index 6729728..7586d25 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -28,7 +28,7 @@ use crate::{
unreliable_receiver_task, Channel, ChannelAsyncMessage, ChannelId, ChannelSyncMessage,
ChannelType, MultiChannelId,
},
- AsyncRuntime, ClientId, QuinnetError, DEFAULT_KEEP_ALIVE_INTERVAL_S,
+ AsyncRuntime, ClientId, InternalConnectionRef, QuinnetError, DEFAULT_KEEP_ALIVE_INTERVAL_S,
DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
},
};
@@ -103,13 +103,19 @@ pub(crate) enum ServerAsyncMessage {
ClientConnectionClosed(ClientId, ConnectionError),
}
+#[derive(Debug, Clone)]
+pub(crate) enum ServerSyncMessage {
+ ClientConnectedAck(Option<ClientId>),
+}
+
#[derive(Debug)]
pub struct ClientConnection {
- client_id: ClientId,
+ connection_handle: InternalConnectionRef,
channels: HashMap<ChannelId, Channel>,
bytes_from_client_recv: mpsc::Receiver<Bytes>,
close_sender: broadcast::Sender<()>,
+ pub(crate) to_connection_send: mpsc::Sender<ServerSyncMessage>,
pub(crate) to_channels_send: mpsc::Sender<ChannelSyncMessage>,
pub(crate) from_channels_recv: mpsc::Receiver<ChannelAsyncMessage>,
}
@@ -152,13 +158,34 @@ impl ClientConnection {
},
}
}
+
+ /// Signal the connection to closes all its background tasks. Before trully closing, the connection will wait for all buffered messages in all its opened channels to be properly sent according to their respective channel type.
+ pub(crate) fn close(&mut self) -> Result<(), QuinnetError> {
+ match self.close_sender.send(()) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
+ Err(QuinnetError::ConnectionAlreadyClosed)
+ }
+ }
+ }
+
+ pub(crate) fn try_close(&mut self) {
+ match &self.close() {
+ Ok(_) => (),
+ Err(err) => error!("Failed to properly close clonnection: {}", err),
+ }
+ }
}
pub struct Endpoint {
clients: HashMap<ClientId, ClientConnection>,
+ last_gen_client_id: ClientId,
+
channels: HashSet<ChannelId>,
default_channel: Option<ChannelId>,
last_gen_id: MultiChannelId,
+
close_sender: broadcast::Sender<()>,
pub(crate) from_async_server_recv: mpsc::Receiver<ServerAsyncMessage>,
@@ -539,16 +566,31 @@ impl Endpoint {
}
}
- fn handle_connection(&mut self, mut connection: ClientConnection) -> Result<(), QuinnetError> {
- match self.clients.contains_key(&connection.client_id) {
- true => todo!(),
- false => {
- for channel_id in self.channels.iter() {
- connection.create_channel(*channel_id)?;
- }
+ fn handle_connection(
+ &mut self,
+ mut connection: ClientConnection,
+ ) -> Result<ClientId, QuinnetError> {
+ for channel_id in self.channels.iter() {
+ if let Err(err) = connection.create_channel(*channel_id) {
+ connection.try_close();
+ return Err(err);
+ };
+ }
- self.clients.insert(connection.client_id, connection);
- Ok(())
+ self.last_gen_client_id += 1;
+ let client_id = self.last_gen_client_id;
+
+ match connection
+ .to_connection_send
+ .try_send(ServerSyncMessage::ClientConnectedAck(Some(client_id)))
+ {
+ Ok(_) => {
+ self.clients.insert(client_id, connection);
+ Ok(client_id)
+ }
+ Err(_) => {
+ connection.try_close();
+ Err(QuinnetError::InternalChannelClosed)
}
}
}
@@ -617,6 +659,7 @@ impl Server {
let mut endpoint = Endpoint {
clients: HashMap::new(),
+ last_gen_client_id: 0,
channels: HashSet::new(),
default_channel: None,
last_gen_id: 0,
@@ -657,9 +700,6 @@ async fn endpoint_task(
to_sync_server_send: mpsc::Sender<ServerAsyncMessage>,
mut endpoint_close_recv: broadcast::Receiver<()>,
) {
- let mut client_gen_id: ClientId = 0;
- let mut client_id_mappings = HashMap::new();
-
let endpoint = QuinnEndpoint::server(endpoint_config, endpoint_adr)
.expect("Failed to create the endpoint");
// Handle incoming connections/clients.
@@ -672,21 +712,14 @@ async fn endpoint_task(
match connecting.await {
Err(err) => error!("An incoming connection failed: {}", err),
Ok(connection) => {
- client_gen_id += 1; // TODO Fix: Better id generation/check
- let client_id = client_gen_id;
- client_id_mappings.insert(connection.stable_id(), client_id);
-
- {
- let to_sync_server_send = to_sync_server_send.clone();
- tokio::spawn(async move {
- client_connection_task(
- connection,
- client_id,
- to_sync_server_send
- )
- .await
- });
- }
+ let to_sync_server_send = to_sync_server_send.clone();
+ tokio::spawn(async move {
+ client_connection_task(
+ connection,
+ to_sync_server_send
+ )
+ .await
+ });
},
}
}
@@ -696,19 +729,14 @@ async fn endpoint_task(
async fn client_connection_task(
connection: quinn::Connection,
- client_id: ClientId,
to_sync_server_send: mpsc::Sender<ServerAsyncMessage>,
) {
- info!(
- "New connection from {}, client_id: {}",
- connection.remote_address(),
- client_id
- );
-
let (client_close_send, client_close_recv) =
broadcast::channel(DEFAULT_KILL_MESSAGE_QUEUE_SIZE);
let (bytes_from_client_send, bytes_from_client_recv) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
+ let (to_connection_send, mut from_sync_server_recv) =
+ mpsc::channel::<ServerSyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
let (from_channels_send, from_channels_recv) =
mpsc::channel::<ChannelAsyncMessage>(DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE);
let (to_channels_send, to_channels_recv) =
@@ -717,77 +745,90 @@ async fn client_connection_task(
// Signal the sync server of this new connection
to_sync_server_send
.send(ServerAsyncMessage::ClientConnected(ClientConnection {
- client_id: client_id,
+ connection_handle: connection.clone(),
channels: HashMap::new(),
bytes_from_client_recv,
close_sender: client_close_send.clone(),
+ to_connection_send,
from_channels_recv,
to_channels_send,
}))
.await
.expect("Failed to signal connection to sync client");
- // Spawn a task to listen for the underlying connection being closed
- {
- let conn = connection.clone();
- let to_sync_server = to_sync_server_send.clone();
- tokio::spawn(async move {
- let conn_err = conn.closed().await;
- info!("Client {} connection closed: {}", client_id, conn_err);
- // If we requested the connection to close, channel may have been closed already.
- if !to_sync_server.is_closed() {
- to_sync_server
- .send(ServerAsyncMessage::ClientConnectionClosed(
- client_id, conn_err,
- ))
+ // Wait for the sync server response before spawning connection tasks.
+ match from_sync_server_recv.recv().await {
+ Some(ServerSyncMessage::ClientConnectedAck(Some(client_id))) => {
+ info!(
+ "New connection from {}, client_id: {}",
+ connection.remote_address(),
+ client_id
+ );
+
+ // Spawn a task to listen for the underlying connection being closed
+ {
+ let conn = connection.clone();
+ let to_sync_server = to_sync_server_send.clone();
+ tokio::spawn(async move {
+ let conn_err = conn.closed().await;
+ info!("Client {} connection closed: {}", client_id, conn_err);
+ // If we requested the connection to close, channel may have been closed already.
+ if !to_sync_server.is_closed() {
+ to_sync_server
+ .send(ServerAsyncMessage::ClientConnectionClosed(
+ client_id, conn_err,
+ ))
+ .await
+ .expect("Failed to signal connection lost to sync server");
+ }
+ });
+ };
+
+ // Spawn a task to listen for streams opened by this client
+ {
+ let connection_handle = connection.clone();
+ let client_close_recv = client_close_recv.resubscribe();
+ let bytes_incoming_send = bytes_from_client_send.clone();
+ tokio::spawn(async move {
+ reliable_receiver_task(
+ client_id,
+ connection_handle,
+ client_close_recv,
+ bytes_incoming_send,
+ )
.await
- .expect("Failed to signal connection lost to sync server");
+ });
}
- });
- };
-
- // Spawn a task to listen for streams opened by this client
- {
- let connection_handle = connection.clone();
- let client_close_recv = client_close_recv.resubscribe();
- let bytes_incoming_send = bytes_from_client_send.clone();
- tokio::spawn(async move {
- reliable_receiver_task(
- client_id,
- connection_handle,
- client_close_recv,
- bytes_incoming_send,
- )
- .await
- });
- }
- // Spawn a task to listen for datagrams sent by this client
- {
- let connection_handle = connection.clone();
- let client_close_recv = client_close_recv.resubscribe();
- let bytes_incoming_send = bytes_from_client_send.clone();
- tokio::spawn(async move {
- unreliable_receiver_task(
- client_id,
- connection_handle,
- client_close_recv,
- bytes_incoming_send,
- )
- .await
- });
- }
+ // Spawn a task to listen for datagrams sent by this client
+ {
+ let connection_handle = connection.clone();
+ let client_close_recv = client_close_recv.resubscribe();
+ let bytes_incoming_send = bytes_from_client_send.clone();
+ tokio::spawn(async move {
+ unreliable_receiver_task(
+ client_id,
+ connection_handle,
+ client_close_recv,
+ bytes_incoming_send,
+ )
+ .await
+ });
+ }
- // Spawn a task to handle send channels for this client
- tokio::spawn(async move {
- channels_task(
- connection,
- client_close_recv,
- to_channels_recv,
- from_channels_send,
- )
- .await
- });
+ // Spawn a task to handle send channels for this client
+ tokio::spawn(async move {
+ channels_task(
+ connection,
+ client_close_recv,
+ to_channels_recv,
+ from_channels_send,
+ )
+ .await
+ });
+ }
+ _ => info!("Connection from {} refused", connection.remote_address()),
+ }
}
fn create_server(mut commands: Commands, runtime: Res<AsyncRuntime>) {
@@ -807,10 +848,9 @@ fn update_sync_server(
while let Ok(message) = endpoint.from_async_server_recv.try_recv() {
match message {
ServerAsyncMessage::ClientConnected(connection) => {
- let id = connection.client_id;
match endpoint.handle_connection(connection) {
- Ok(_) => connection_events.send(ConnectionEvent { id }),
- Err(_) => error!("Failed to handle connection of client {}", id),
+ Ok(client_id) => connection_events.send(ConnectionEvent { id: client_id }),
+ Err(_) => error!("Failed to handle connection of a client"),
};
}
ServerAsyncMessage::ClientConnectionClosed(client_id, _) => {