1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::{SendStream, VarInt};
use std::fmt::Debug;
use tokio::sync::{
broadcast,
mpsc::{self, error::TrySendError},
};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
pub(crate) type MultiChannelId = u64;
#[derive(Debug, Copy, Clone)]
pub enum ChannelType {
OrderedReliable,
UnorderedReliable,
Unreliable,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ChannelId {
OrderedReliable(MultiChannelId),
UnorderedReliable,
Unreliable,
}
impl std::fmt::Display for ChannelId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug)]
pub struct Channel {
sender: mpsc::Sender<Bytes>,
}
impl Channel {
pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
match self.sender.try_send(payload.into()) {
Ok(_) => Ok(()),
Err(err) => match err {
TrySendError::Full(_) => Err(QuinnetError::FullQueue),
TrySendError::Closed(_) => Err(QuinnetError::InternalChannelClosed),
},
}
}
pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self {
Self { sender }
}
}
pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
mut close_receiver: broadcast::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
let mut frame_sender = new_uni_frame_sender(&connection).await;
tokio::select! {
_ = close_receiver.recv() => {
trace!("Ordered Reliable Channel task received a close signal")
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
if let Err(err) = frame_sender.send(msg_bytes).await {
error!("Error while sending, {}", err);
to_sync_client.send(
on_lost_connection())
.await
.expect("Failed to signal connection lost to sync client");
}
}
} => {
trace!("Ordered Reliable Channel task ended")
}
}
while let Ok(msg_bytes) = to_server_receiver.try_recv() {
if let Err(err) = frame_sender.send(msg_bytes).await {
error!("Error while sending, {}", err);
}
}
if let Err(err) = frame_sender.flush().await {
error!("Error while flushing stream: {}", err);
}
if let Err(err) = frame_sender.into_inner().finish().await {
error!("Failed to shutdown stream gracefully: {}", err);
}
todo!("Do not close here, wait for all channels to be flushed");
connection.close(VarInt::from_u32(0), "closed".as_bytes());
}
pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
connection: quinn::Connection,
to_sync_client: mpsc::Sender<T>,
on_lost_connection: fn() -> T,
mut close_receiver: broadcast::Receiver<()>,
mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
tokio::select! {
_ = close_receiver.recv() => {
trace!("Unordered Reliable Channel task received a close signal")
}
_ = async {
while let Some(msg_bytes) = to_server_receiver.recv().await {
let mut frame_sender = new_uni_frame_sender(&connection).await;
if let Err(err) = frame_sender.send(msg_bytes).await {
error!("Error while sending, {}", err);
to_sync_client.send(
on_lost_connection())
.await
.expect("Failed to signal connection lost to sync client");
}
}
} => {
trace!("Unordered Reliable Channel task ended")
}
}
}
async fn new_uni_frame_sender(
connection: &quinn::Connection,
) -> FramedWrite<SendStream, LengthDelimitedCodec> {
let uni_sender = connection
.open_uni()
.await
.expect("Failed to open send stream");
FramedWrite::new(uni_sender, LengthDelimitedCodec::new())
}
|