aboutsummaryrefslogtreecommitdiff
path: root/src/shared/channel.rs
blob: 72dca41cef270e56836142f0315921f94cc5be8e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
use super::QuinnetError;
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
use quinn::SendStream;
use std::fmt::Debug;
use tokio::sync::{
    broadcast,
    mpsc::{self, error::TrySendError},
};
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};

pub(crate) type MultiChannelId = u64;

#[derive(Debug, Copy, Clone)]
pub enum ChannelType {
    OrderedReliable,
    UnorderedReliable,
    Unreliable,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ChannelId {
    OrderedReliable(MultiChannelId),
    UnorderedReliable,
    Unreliable,
}

impl std::fmt::Display for ChannelId {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

#[derive(Debug)]
pub struct Channel {
    sender: mpsc::Sender<Bytes>,
    close_sender: mpsc::Sender<()>,
}

impl Channel {
    pub(crate) fn new(sender: mpsc::Sender<Bytes>, close_sender: mpsc::Sender<()>) -> Self {
        Self {
            sender,
            close_sender,
        }
    }

    pub(crate) fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
        match self.sender.try_send(payload.into()) {
            Ok(_) => Ok(()),
            Err(err) => match err {
                TrySendError::Full(_) => Err(QuinnetError::FullQueue),
                TrySendError::Closed(_) => Err(QuinnetError::InternalChannelClosed),
            },
        }
    }

    pub(crate) fn close(&self) -> Result<(), QuinnetError> {
        match self.close_sender.blocking_send(()) {
            Ok(_) => Ok(()),
            Err(_) => {
                // The only possible error for a send is that there is no active receivers, meaning that the tasks are already terminated.
                Err(QuinnetError::ChannelAlreadyClosed)
            }
        }
    }
}

pub(crate) async fn ordered_reliable_channel_task<T: Debug>(
    connection: quinn::Connection,
    _: mpsc::Sender<()>,
    to_sync_client: mpsc::Sender<T>,
    on_lost_connection: fn() -> T,
    mut close_receiver: broadcast::Receiver<()>,
    mut channel_close_receiver: mpsc::Receiver<()>,
    mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
    let mut frame_sender = new_uni_frame_sender(&connection).await;

    tokio::select! {
        _ = close_receiver.recv() => {
            trace!("Ordered Reliable Channel task received a close signal")
        }
        _ = channel_close_receiver.recv() => {
            trace!("Ordered Reliable Channel task received a channel close signal")
        }
        _ = async {
            while let Some(msg_bytes) = to_server_receiver.recv().await {
                if let Err(err) = frame_sender.send(msg_bytes).await {
                    error!("Error while sending, {}", err);
                    to_sync_client.send(
                        on_lost_connection())
                        .await
                        .expect("Failed to signal connection lost to sync client");
                }
            }
        } => {
            trace!("Ordered Reliable Channel task ended")
        }
    };
    while let Ok(msg_bytes) = to_server_receiver.try_recv() {
        if let Err(err) = frame_sender.send(msg_bytes).await {
            error!("Error while sending, {}", err);
        }
    }
    if let Err(err) = frame_sender.flush().await {
        error!("Error while flushing stream: {}", err);
    }
    if let Err(err) = frame_sender.into_inner().finish().await {
        error!("Failed to shutdown stream gracefully: {}", err);
    }
}

pub(crate) async fn unordered_reliable_channel_task<T: Debug>(
    connection: quinn::Connection,
    _: mpsc::Sender<()>,
    to_sync_client: mpsc::Sender<T>,
    on_lost_connection: fn() -> T,
    mut close_receiver: broadcast::Receiver<()>,
    mut channel_close_receiver: mpsc::Receiver<()>,
    mut to_server_receiver: mpsc::Receiver<Bytes>,
) {
    tokio::select! {
        _ = close_receiver.recv() => {
            trace!("Ordered Reliable Channel task received a close signal")
        }
        _ = channel_close_receiver.recv() => {
            trace!("Unordered Reliable Channel task received a channel close signal")
        }
        _ = async {
            while let Some(msg_bytes) = to_server_receiver.recv().await {
                let mut frame_sender = new_uni_frame_sender(&connection).await;
                if let Err(err) = frame_sender.send(msg_bytes).await {
                    error!("Error while sending, {}", err);
                    to_sync_client.send(
                        on_lost_connection())
                        .await
                        .expect("Failed to signal connection lost to sync client");
                }
                todo!("finish the stream")
            }
        } => {
            trace!("Unordered Reliable Channel task ended")
        }
    };
    while let Ok(msg_bytes) = to_server_receiver.try_recv() {
        let mut frame_sender = new_uni_frame_sender(&connection).await;
        if let Err(err) = frame_sender.send(msg_bytes).await {
            error!("Error while sending, {}", err);
        }
        todo!("finish the stream")
    }
}

async fn new_uni_frame_sender(
    connection: &quinn::Connection,
) -> FramedWrite<SendStream, LengthDelimitedCodec> {
    let uni_sender = connection
        .open_uni()
        .await
        .expect("Failed to open send stream");
    FramedWrite::new(uni_sender, LengthDelimitedCodec::new())
}