aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client.rs14
-rw-r--r--src/lib.rs6
-rw-r--r--src/server.rs14
3 files changed, 21 insertions, 13 deletions
diff --git a/src/client.rs b/src/client.rs
index 201a795..fe53398 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -12,7 +12,6 @@ use quinn::{ClientConfig, Endpoint};
use rustls::Certificate;
use serde::Deserialize;
use tokio::{
- runtime::Runtime,
sync::{
broadcast,
mpsc::{
@@ -25,7 +24,9 @@ use tokio::{
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
-use crate::{QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE};
+use crate::{
+ AsyncRuntime, QuinnetError, DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
+};
use self::certificate::{
load_known_hosts_store_from_config, CertVerificationStatus, CertVerifierAction,
@@ -116,6 +117,7 @@ pub(crate) enum InternalSyncMessage {
},
}
+#[derive(Resource)]
pub struct Client {
state: ClientState,
// TODO Perf: multiple channels
@@ -335,7 +337,7 @@ async fn connection_task(
}
}
-fn start_async_client(mut commands: Commands, runtime: Res<Runtime>) {
+fn start_async_client(mut commands: Commands, runtime: Res<AsyncRuntime>) {
let (from_server_sender, from_server_receiver) =
mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
let (to_server_sender, to_server_receiver) = mpsc::channel::<Bytes>(DEFAULT_MESSAGE_QUEUE_SIZE);
@@ -440,13 +442,13 @@ impl Plugin for QuinnetClientPlugin {
.add_startup_system_to_stage(StartupStage::PreStartup, start_async_client)
.add_system(update_sync_client);
- if app.world.get_resource_mut::<Runtime>().is_none() {
- app.insert_resource(
+ if app.world.get_resource_mut::<AsyncRuntime>().is_none() {
+ app.insert_resource(AsyncRuntime(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
- );
+ ));
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 180cae6..bfde4a6 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,6 @@
+use bevy::prelude::{Deref, DerefMut, Resource};
+use tokio::runtime::Runtime;
+
pub const DEFAULT_MESSAGE_QUEUE_SIZE: usize = 150;
pub const DEFAULT_KILL_MESSAGE_QUEUE_SIZE: usize = 10;
pub const DEFAULT_KEEP_ALIVE_INTERVAL_S: u64 = 4;
@@ -7,6 +10,9 @@ pub mod server;
pub type ClientId = u64;
+#[derive(Resource, Deref, DerefMut)]
+pub(crate) struct AsyncRuntime(pub(crate) Runtime);
+
/// Enum with possibles errors that can occur in Bevy Quinnet
#[derive(thiserror::Error, Debug)]
pub enum QuinnetError {
diff --git a/src/server.rs b/src/server.rs
index ab2f9e9..ae7ae83 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -16,7 +16,6 @@ use futures_util::StreamExt;
use quinn::{Endpoint, NewConnection, ServerConfig};
use serde::Deserialize;
use tokio::{
- runtime::Runtime,
sync::{
broadcast::{self},
mpsc::{self, error::TryRecvError},
@@ -26,8 +25,8 @@ use tokio::{
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use crate::{
- ClientId, QuinnetError, DEFAULT_KEEP_ALIVE_INTERVAL_S, DEFAULT_KILL_MESSAGE_QUEUE_SIZE,
- DEFAULT_MESSAGE_QUEUE_SIZE,
+ AsyncRuntime, ClientId, QuinnetError, DEFAULT_KEEP_ALIVE_INTERVAL_S,
+ DEFAULT_KILL_MESSAGE_QUEUE_SIZE, DEFAULT_MESSAGE_QUEUE_SIZE,
};
pub const DEFAULT_INTERNAL_MESSAGE_CHANNEL_SIZE: usize = 100;
@@ -133,6 +132,7 @@ pub(crate) struct ClientConnection {
close_sender: broadcast::Sender<()>,
}
+#[derive(Resource)]
pub struct Server {
clients: HashMap<ClientId, ClientConnection>,
receiver: mpsc::Receiver<ClientPayload>,
@@ -536,7 +536,7 @@ async fn connections_listening_task(
}
}
-fn start_async_server(mut commands: Commands, runtime: Res<Runtime>) {
+fn start_async_server(mut commands: Commands, runtime: Res<AsyncRuntime>) {
// TODO Clean: Configure size
let (from_clients_sender, from_clients_receiver) =
mpsc::channel::<ClientPayload>(DEFAULT_MESSAGE_QUEUE_SIZE);
@@ -615,13 +615,13 @@ impl Plugin for QuinnetServerPlugin {
.add_startup_system_to_stage(StartupStage::PreStartup, start_async_server)
.add_system_to_stage(CoreStage::PreUpdate, update_sync_server);
- if app.world.get_resource_mut::<Runtime>().is_none() {
- app.insert_resource(
+ if app.world.get_resource_mut::<AsyncRuntime>().is_none() {
+ app.insert_resource(AsyncRuntime(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
- );
+ ));
}
}
}