aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHimbeerserverDE <himbeerserverde@gmail.com>2023-11-13 13:48:47 +0100
committerHimbeerserverDE <himbeerserverde@gmail.com>2023-11-13 13:48:47 +0100
commitd4b4411eef644b4a4697b7d127e68c94904b6f99 (patch)
treec9dd2d9408fceba5870b976f9f9c81aa156d8bc2
parent18e4fee20bc027178eb72db48345081d5e96661e (diff)
fix abnormally high cpu load resulting from bad async file read implementation
-rw-r--r--Cargo.lock15
-rw-r--r--Cargo.toml2
-rw-r--r--src/supervisor.rs114
3 files changed, 80 insertions, 51 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 998bd41..b129522 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -352,6 +352,17 @@ dependencies = [
]
[[package]]
+name = "mio"
+version = "0.8.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0"
+dependencies = [
+ "libc",
+ "wasi",
+ "windows-sys",
+]
+
+[[package]]
name = "nix"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -688,9 +699,13 @@ checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"backtrace",
"bytes",
+ "libc",
+ "mio",
"num_cpus",
"pin-project-lite",
+ "socket2",
"tokio-macros",
+ "windows-sys",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 6b04c7a..1f713c7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,4 +19,4 @@ serde_json = "1.0"
socket2 = "0.5.5"
sysinfo = "0.29.10"
thiserror = "1.0"
-tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time", "fs", "io-util"] }
+tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time", "fs", "io-util", "net"] }
diff --git a/src/supervisor.rs b/src/supervisor.rs
index c44867d..d35bbcf 100644
--- a/src/supervisor.rs
+++ b/src/supervisor.rs
@@ -1,14 +1,14 @@
use crate::{Error, Result, *};
use std::ffi::CString;
+use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::os::fd::AsRawFd;
use std::time::Duration;
use std::{io, mem};
-use tokio::fs::{File, OpenOptions};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::unix::AsyncFd;
use tokio::sync::mpsc;
use tokio::time::Interval;
@@ -42,24 +42,24 @@ struct sockaddr_pppox {
/// A set of file descriptors describing a PPP session
/// and its virtual network interface.
#[derive(Debug)]
-struct SessionFds(Socket, File, File);
+struct SessionFds(Socket, AsyncFd<File>, AsyncFd<File>);
impl SessionFds {
- /// Returns a mutable reference to the file descriptor
+ /// Returns an immutable reference to the file descriptor
/// that handles LCP, authentication and other link related traffic.
- pub fn link_mut(&mut self) -> &mut File {
- &mut self.1
+ pub fn link(&self) -> &AsyncFd<File> {
+ &self.1
}
- /// Returns a mutable reference to the file descriptor
+ /// Returns an immutable reference to the file descriptor
/// that handles NCP traffic.
- pub fn network_mut(&mut self) -> &mut File {
- &mut self.2
+ pub fn network(&self) -> &AsyncFd<File> {
+ &self.2
}
- /// Returns mutable references to both traffic related file descriptors.
- pub fn both_mut(&mut self) -> (&mut File, &mut File) {
- (&mut self.1, &mut self.2)
+ /// Returns immutable references to both traffic related file descriptors.
+ pub fn both(&self) -> (&AsyncFd<File>, &AsyncFd<File>) {
+ (&self.1, &self.2)
}
}
@@ -294,30 +294,30 @@ impl Client {
self.ipv6cp.open();
loop {
- let (ctl, ppp_dev) = session_fds.as_mut().map(|fds| fds.both_mut()).unzip();
+ let (ctl, ppp_dev) = session_fds.as_mut().map(|fds| fds.both()).unzip();
tokio::select! {
biased;
packet = self.pppoe.to_send() => self.send_pppoe(&sock_disc, packet).await?,
packet = self.lcp.to_send() => self.send_lcp(
- session_fds.as_mut().map(|fds| fds.link_mut()),
+ session_fds.as_mut().map(|fds| fds.link()),
packet
).await?,
packet = self.pap.to_send() => self.send_pap(
- session_fds.as_mut().map(|fds| fds.link_mut()),
+ session_fds.as_mut().map(|fds| fds.link()),
packet
).await?,
packet = self.chap.to_send() => self.send_chap(
- session_fds.as_mut().map(|fds| fds.link_mut()),
+ session_fds.as_mut().map(|fds| fds.link()),
packet
).await?,
packet = self.ipcp.to_send() => self.send_ipcp(
- session_fds.as_mut().map(|fds| fds.network_mut()),
+ session_fds.as_mut().map(|fds| fds.network()),
packet
).await?,
packet = self.ipv6cp.to_send() => self.send_ipv6cp(
- session_fds.as_mut().map(|fds| fds.network_mut()),
+ session_fds.as_mut().map(|fds| fds.network()),
packet
).await?,
@@ -517,7 +517,7 @@ impl Client {
if *lcp_rx.borrow() {
// Send an LCP Echo-Request every 12 seconds.
self.send_lcp(
- session_fds.as_mut().map(|fds| fds.link_mut()),
+ session_fds.as_mut().map(|fds| fds.link()),
Packet {
ty: PacketType::EchoRequest,
options: Vec::default(),
@@ -624,7 +624,11 @@ impl Client {
/// Transforms a [`Packet`] into an [`LcpPkt`] and sends it
/// over the PPP session if available.
- async fn send_lcp(&mut self, file: Option<&mut File>, packet: Packet<LcpOpt>) -> Result<()> {
+ async fn send_lcp(
+ &mut self,
+ file: Option<&AsyncFd<File>>,
+ packet: Packet<LcpOpt>,
+ ) -> Result<()> {
let pkt = PppPkt::new_lcp(match packet.ty {
PacketType::ConfigureRequest => {
self.lcp_id_cfg = rand::random();
@@ -678,8 +682,7 @@ impl Client {
let mut buf = Vec::new();
pkt.serialize(&mut buf)?;
- file.write_all(&buf).await?;
- file.flush().await?;
+ async_write_all(file, &buf).await?;
}
Ok(())
@@ -687,7 +690,7 @@ impl Client {
/// Transforms a [`PapPacket`] into a [`PapPkt`] and sends it
/// over the PPP session if available.
- async fn send_pap(&mut self, file: Option<&mut File>, packet: PapPacket) -> Result<()> {
+ async fn send_pap(&mut self, file: Option<&AsyncFd<File>>, packet: PapPacket) -> Result<()> {
let pkt = PppPkt::new_pap(match packet {
PapPacket::AuthenticateRequest => {
self.pap_id = rand::random();
@@ -708,8 +711,7 @@ impl Client {
let mut buf = Vec::new();
pkt.serialize(&mut buf)?;
- file.write_all(&buf).await?;
- file.flush().await?;
+ async_write_all(file, &buf).await?;
}
Ok(())
@@ -717,7 +719,7 @@ impl Client {
/// Transforms a [`ChapPacket`] into a [`ChapPkt`] and sends it
/// over the PPP session if available.
- async fn send_chap(&mut self, file: Option<&mut File>, packet: ChapPacket) -> Result<()> {
+ async fn send_chap(&mut self, file: Option<&AsyncFd<File>>, packet: ChapPacket) -> Result<()> {
let pkt = PppPkt::new_chap(match packet.ty {
ChapType::Challenge | ChapType::Success | ChapType::Failure => return Ok(()), // illegal
ChapType::Response => {
@@ -741,8 +743,7 @@ impl Client {
let mut buf = Vec::new();
pkt.serialize(&mut buf)?;
- file.write_all(&buf).await?;
- file.flush().await?;
+ async_write_all(file, &buf).await?;
}
Ok(())
@@ -750,7 +751,11 @@ impl Client {
/// Transforms a [`Packet`] into an [`IpcpPkt`] and sends it
/// over the PPP session if available.
- async fn send_ipcp(&mut self, file: Option<&mut File>, packet: Packet<IpcpOpt>) -> Result<()> {
+ async fn send_ipcp(
+ &mut self,
+ file: Option<&AsyncFd<File>>,
+ packet: Packet<IpcpOpt>,
+ ) -> Result<()> {
let pkt = PppPkt::new_ipcp(match packet.ty {
PacketType::ConfigureRequest => {
self.ipcp_id_cfg = rand::random();
@@ -789,8 +794,7 @@ impl Client {
let mut buf = Vec::new();
pkt.serialize(&mut buf)?;
- file.write_all(&buf).await?;
- file.flush().await?;
+ async_write_all(file, &buf).await?;
}
Ok(())
@@ -800,7 +804,7 @@ impl Client {
/// over the PPP session if available.
async fn send_ipv6cp(
&mut self,
- file: Option<&mut File>,
+ file: Option<&AsyncFd<File>>,
packet: Packet<Ipv6cpOpt>,
) -> Result<()> {
let pkt = PppPkt::new_ipv6cp(match packet.ty {
@@ -841,8 +845,7 @@ impl Client {
let mut buf = Vec::new();
pkt.serialize(&mut buf)?;
- file.write_all(&buf).await?;
- file.flush().await?;
+ async_write_all(file, &buf).await?;
}
Ok(())
@@ -1383,8 +1386,7 @@ impl Client {
.write(true)
.create(false)
.truncate(false)
- .open("/dev/ppp")
- .await?;
+ .open("/dev/ppp")?;
if unsafe { ioctls::pppiocattchan(ctl.as_raw_fd(), &chindex) } < 0 {
os_err!();
@@ -1400,8 +1402,7 @@ impl Client {
.write(true)
.create(false)
.truncate(false)
- .open("/dev/ppp")
- .await?;
+ .open("/dev/ppp")?;
let mut ifunit = -1;
if unsafe { ioctls::pppiocnewunit(ppp_dev.as_raw_fd(), &mut ifunit) } < 0 {
@@ -1417,26 +1418,39 @@ impl Client {
os_err!();
}
- Ok(SessionFds(sock, ctl, ppp_dev))
+ Ok(SessionFds(sock, AsyncFd::new(ctl)?, AsyncFd::new(ppp_dev)?))
}
}
-async fn option_read(file: Option<&mut File>, buf: &mut [u8]) -> Option<io::Result<usize>> {
+async fn option_read(file: Option<&AsyncFd<File>>, buf: &mut [u8]) -> Option<io::Result<usize>> {
match file {
- Some(file) => loop {
- match file.read(buf).await {
- Ok(n) => return Some(Ok(n)),
- Err(e) => {
- if e.kind() != io::ErrorKind::WouldBlock {
- return Some(Err(e));
- }
- }
- }
- },
+ Some(file) => Some(option_read_inner(file, buf).await),
None => None,
}
}
+async fn option_read_inner(file: &AsyncFd<File>, buf: &mut [u8]) -> io::Result<usize> {
+ loop {
+ let mut guard = file.readable().await?;
+
+ match guard.try_io(|inner| inner.get_ref().read(buf)) {
+ Ok(result) => return result,
+ Err(_would_block) => continue,
+ }
+ }
+}
+
+async fn async_write_all(file: &AsyncFd<File>, buf: &[u8]) -> io::Result<()> {
+ loop {
+ let mut guard = file.writable().await?;
+
+ match guard.try_io(|inner| inner.get_ref().write_all(buf)) {
+ Ok(result) => return result,
+ Err(_would_block) => continue,
+ }
+ }
+}
+
/// Creates a link-local IPv6 address from a 64-bit interface identifier.
fn ll(ifid: u64) -> Ipv6Addr {
((0xfe80 << 112) | ifid as u128).into()