aboutsummaryrefslogtreecommitdiff
path: root/src/shared/channel.rs
blob: 73f5f6fc2d15b0464b6cc4999e3e0d37cdf3e624 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use bevy::prelude::{error, trace};
use bytes::Bytes;
use futures::sink::SinkExt;
use futures_util::StreamExt;
use tokio::sync::{
    broadcast,
    mpsc::{self, error::TrySendError},
};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};

use super::QuinnetError;

pub(crate) type OrdRelChannelId = u64;

#[derive(Debug, Copy, Clone)]
pub enum ChannelType {
    OrderedReliable,
    UnorderedReliable,
    Unreliable,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum ChannelId {
    OrderedReliable(OrdRelChannelId),
    UnorderedReliable,
    Unreliable,
}

impl std::fmt::Display for ChannelId {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

#[derive(Debug)]
pub struct Channel {
    sender: mpsc::Sender<Bytes>,
}

impl Channel {
    pub fn send_payload<T: Into<Bytes>>(&self, payload: T) -> Result<(), QuinnetError> {
        match self.sender.try_send(payload.into()) {
            Ok(_) => Ok(()),
            Err(err) => match err {
                TrySendError::Full(_) => Err(QuinnetError::FullQueue),
                TrySendError::Closed(_) => Err(QuinnetError::ChannelClosed),
            },
        }
    }

    pub(crate) fn new(sender: mpsc::Sender<Bytes>) -> Self {
        Self { sender }
    }
}

// async fn ordered_reliable_channel_task(
//     connection: quinn::Connection,
//     to_sync_client: mpsc::Sender<InternalAsyncMessage>,
//     mut close_receiver: broadcast::Receiver<()>,
//     mut to_server_receiver: mpsc::Receiver<Bytes>,
// ) {
//     tokio::select! {
//         _ = close_receiver.recv() => {
//             trace!("Ordered Reliable Send Channel received a close signal")
//         }
//         _ = async {
//             let uni_sender = connection
//                 .open_uni()
//                 .await
//                 .expect("Failed to open send stream");
//             let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());

//             while let Some(msg_bytes) = to_server_receiver.recv().await {
//                 if let Err(err) = frame_sender.send(msg_bytes).await {
//                     // TODO Clean: error handling
//                     error!("Error while sending, {}", err);
//                     // error!("Client seems disconnected, closing resources");
//                     // if let Err(_) = close_sender_clone.send(()) {
//                     //     error!("Failed to close all client streams & resources")
//                     // }
//                     to_sync_client.send(
//                         InternalAsyncMessage::LostConnection)
//                         .await
//                         .expect("Failed to signal connection lost to sync client");
//                 }
//             }
//         } => {
//             trace!("Ordered Reliable Send Channel ended")
//         }
//     }
// }

// async fn unordered_reliable_channel_task(
//     connection: quinn::Connection,
//     to_sync_client: mpsc::Sender<InternalAsyncMessage>,
//     mut close_receiver: broadcast::Receiver<()>,
//     mut to_server_receiver: mpsc::Receiver<Bytes>,
// ) {
//     tokio::select! {
//         _ = close_receiver.recv() => {
//             trace!("Unordered Reliable Send Channel received a close signal")
//         }
//         _ = async {
//             while let Some(msg_bytes) = to_server_receiver.recv().await {
//                 let uni_sender = connection
//                     .open_uni()
//                     .await
//                     .expect("Failed to open send stream");
//                 let mut frame_sender = FramedWrite::new(uni_sender, LengthDelimitedCodec::new());

//                 if let Err(err) = frame_sender.send(msg_bytes).await {
//                     // TODO Clean: error handling
//                     error!("Error while sending, {}", err);
//                     // error!("Client seems disconnected, closing resources");
//                     // if let Err(_) = close_sender_clone.send(()) {
//                     //     error!("Failed to close all client streams & resources")
//                     // }
//                     to_sync_client.send(
//                         InternalAsyncMessage::LostConnection)
//                         .await
//                         .expect("Failed to signal connection lost to sync client");
//                 }
//             }
//         } => {
//             trace!("Unordered Reliable Send Channel ended")
//         }
//     }
// }