diff options
author | HimbeerserverDE <himbeerserverde@gmail.com> | 2023-11-13 13:48:47 +0100 |
---|---|---|
committer | HimbeerserverDE <himbeerserverde@gmail.com> | 2023-11-13 13:48:47 +0100 |
commit | d4b4411eef644b4a4697b7d127e68c94904b6f99 (patch) | |
tree | c9dd2d9408fceba5870b976f9f9c81aa156d8bc2 | |
parent | 18e4fee20bc027178eb72db48345081d5e96661e (diff) |
fix abnormally high cpu load resulting from bad async file read implementation
-rw-r--r-- | Cargo.lock | 15 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/supervisor.rs | 114 |
3 files changed, 80 insertions, 51 deletions
@@ -352,6 +352,17 @@ dependencies = [ ] [[package]] +name = "mio" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +dependencies = [ + "libc", + "wasi", + "windows-sys", +] + +[[package]] name = "nix" version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -688,9 +699,13 @@ checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", + "libc", + "mio", "num_cpus", "pin-project-lite", + "socket2", "tokio-macros", + "windows-sys", ] [[package]] @@ -19,4 +19,4 @@ serde_json = "1.0" socket2 = "0.5.5" sysinfo = "0.29.10" thiserror = "1.0" -tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time", "fs", "io-util"] } +tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "sync", "time", "fs", "io-util", "net"] } diff --git a/src/supervisor.rs b/src/supervisor.rs index c44867d..d35bbcf 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -1,14 +1,14 @@ use crate::{Error, Result, *}; use std::ffi::CString; +use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; use std::net::{Ipv4Addr, Ipv6Addr}; use std::os::fd::AsRawFd; use std::time::Duration; use std::{io, mem}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::unix::AsyncFd; use tokio::sync::mpsc; use tokio::time::Interval; @@ -42,24 +42,24 @@ struct sockaddr_pppox { /// A set of file descriptors describing a PPP session /// and its virtual network interface. #[derive(Debug)] -struct SessionFds(Socket, File, File); +struct SessionFds(Socket, AsyncFd<File>, AsyncFd<File>); impl SessionFds { - /// Returns a mutable reference to the file descriptor + /// Returns an immutable reference to the file descriptor /// that handles LCP, authentication and other link related traffic. - pub fn link_mut(&mut self) -> &mut File { - &mut self.1 + pub fn link(&self) -> &AsyncFd<File> { + &self.1 } - /// Returns a mutable reference to the file descriptor + /// Returns an immutable reference to the file descriptor /// that handles NCP traffic. - pub fn network_mut(&mut self) -> &mut File { - &mut self.2 + pub fn network(&self) -> &AsyncFd<File> { + &self.2 } - /// Returns mutable references to both traffic related file descriptors. - pub fn both_mut(&mut self) -> (&mut File, &mut File) { - (&mut self.1, &mut self.2) + /// Returns immutable references to both traffic related file descriptors. + pub fn both(&self) -> (&AsyncFd<File>, &AsyncFd<File>) { + (&self.1, &self.2) } } @@ -294,30 +294,30 @@ impl Client { self.ipv6cp.open(); loop { - let (ctl, ppp_dev) = session_fds.as_mut().map(|fds| fds.both_mut()).unzip(); + let (ctl, ppp_dev) = session_fds.as_mut().map(|fds| fds.both()).unzip(); tokio::select! { biased; packet = self.pppoe.to_send() => self.send_pppoe(&sock_disc, packet).await?, packet = self.lcp.to_send() => self.send_lcp( - session_fds.as_mut().map(|fds| fds.link_mut()), + session_fds.as_mut().map(|fds| fds.link()), packet ).await?, packet = self.pap.to_send() => self.send_pap( - session_fds.as_mut().map(|fds| fds.link_mut()), + session_fds.as_mut().map(|fds| fds.link()), packet ).await?, packet = self.chap.to_send() => self.send_chap( - session_fds.as_mut().map(|fds| fds.link_mut()), + session_fds.as_mut().map(|fds| fds.link()), packet ).await?, packet = self.ipcp.to_send() => self.send_ipcp( - session_fds.as_mut().map(|fds| fds.network_mut()), + session_fds.as_mut().map(|fds| fds.network()), packet ).await?, packet = self.ipv6cp.to_send() => self.send_ipv6cp( - session_fds.as_mut().map(|fds| fds.network_mut()), + session_fds.as_mut().map(|fds| fds.network()), packet ).await?, @@ -517,7 +517,7 @@ impl Client { if *lcp_rx.borrow() { // Send an LCP Echo-Request every 12 seconds. self.send_lcp( - session_fds.as_mut().map(|fds| fds.link_mut()), + session_fds.as_mut().map(|fds| fds.link()), Packet { ty: PacketType::EchoRequest, options: Vec::default(), @@ -624,7 +624,11 @@ impl Client { /// Transforms a [`Packet`] into an [`LcpPkt`] and sends it /// over the PPP session if available. - async fn send_lcp(&mut self, file: Option<&mut File>, packet: Packet<LcpOpt>) -> Result<()> { + async fn send_lcp( + &mut self, + file: Option<&AsyncFd<File>>, + packet: Packet<LcpOpt>, + ) -> Result<()> { let pkt = PppPkt::new_lcp(match packet.ty { PacketType::ConfigureRequest => { self.lcp_id_cfg = rand::random(); @@ -678,8 +682,7 @@ impl Client { let mut buf = Vec::new(); pkt.serialize(&mut buf)?; - file.write_all(&buf).await?; - file.flush().await?; + async_write_all(file, &buf).await?; } Ok(()) @@ -687,7 +690,7 @@ impl Client { /// Transforms a [`PapPacket`] into a [`PapPkt`] and sends it /// over the PPP session if available. - async fn send_pap(&mut self, file: Option<&mut File>, packet: PapPacket) -> Result<()> { + async fn send_pap(&mut self, file: Option<&AsyncFd<File>>, packet: PapPacket) -> Result<()> { let pkt = PppPkt::new_pap(match packet { PapPacket::AuthenticateRequest => { self.pap_id = rand::random(); @@ -708,8 +711,7 @@ impl Client { let mut buf = Vec::new(); pkt.serialize(&mut buf)?; - file.write_all(&buf).await?; - file.flush().await?; + async_write_all(file, &buf).await?; } Ok(()) @@ -717,7 +719,7 @@ impl Client { /// Transforms a [`ChapPacket`] into a [`ChapPkt`] and sends it /// over the PPP session if available. - async fn send_chap(&mut self, file: Option<&mut File>, packet: ChapPacket) -> Result<()> { + async fn send_chap(&mut self, file: Option<&AsyncFd<File>>, packet: ChapPacket) -> Result<()> { let pkt = PppPkt::new_chap(match packet.ty { ChapType::Challenge | ChapType::Success | ChapType::Failure => return Ok(()), // illegal ChapType::Response => { @@ -741,8 +743,7 @@ impl Client { let mut buf = Vec::new(); pkt.serialize(&mut buf)?; - file.write_all(&buf).await?; - file.flush().await?; + async_write_all(file, &buf).await?; } Ok(()) @@ -750,7 +751,11 @@ impl Client { /// Transforms a [`Packet`] into an [`IpcpPkt`] and sends it /// over the PPP session if available. - async fn send_ipcp(&mut self, file: Option<&mut File>, packet: Packet<IpcpOpt>) -> Result<()> { + async fn send_ipcp( + &mut self, + file: Option<&AsyncFd<File>>, + packet: Packet<IpcpOpt>, + ) -> Result<()> { let pkt = PppPkt::new_ipcp(match packet.ty { PacketType::ConfigureRequest => { self.ipcp_id_cfg = rand::random(); @@ -789,8 +794,7 @@ impl Client { let mut buf = Vec::new(); pkt.serialize(&mut buf)?; - file.write_all(&buf).await?; - file.flush().await?; + async_write_all(file, &buf).await?; } Ok(()) @@ -800,7 +804,7 @@ impl Client { /// over the PPP session if available. async fn send_ipv6cp( &mut self, - file: Option<&mut File>, + file: Option<&AsyncFd<File>>, packet: Packet<Ipv6cpOpt>, ) -> Result<()> { let pkt = PppPkt::new_ipv6cp(match packet.ty { @@ -841,8 +845,7 @@ impl Client { let mut buf = Vec::new(); pkt.serialize(&mut buf)?; - file.write_all(&buf).await?; - file.flush().await?; + async_write_all(file, &buf).await?; } Ok(()) @@ -1383,8 +1386,7 @@ impl Client { .write(true) .create(false) .truncate(false) - .open("/dev/ppp") - .await?; + .open("/dev/ppp")?; if unsafe { ioctls::pppiocattchan(ctl.as_raw_fd(), &chindex) } < 0 { os_err!(); @@ -1400,8 +1402,7 @@ impl Client { .write(true) .create(false) .truncate(false) - .open("/dev/ppp") - .await?; + .open("/dev/ppp")?; let mut ifunit = -1; if unsafe { ioctls::pppiocnewunit(ppp_dev.as_raw_fd(), &mut ifunit) } < 0 { @@ -1417,26 +1418,39 @@ impl Client { os_err!(); } - Ok(SessionFds(sock, ctl, ppp_dev)) + Ok(SessionFds(sock, AsyncFd::new(ctl)?, AsyncFd::new(ppp_dev)?)) } } -async fn option_read(file: Option<&mut File>, buf: &mut [u8]) -> Option<io::Result<usize>> { +async fn option_read(file: Option<&AsyncFd<File>>, buf: &mut [u8]) -> Option<io::Result<usize>> { match file { - Some(file) => loop { - match file.read(buf).await { - Ok(n) => return Some(Ok(n)), - Err(e) => { - if e.kind() != io::ErrorKind::WouldBlock { - return Some(Err(e)); - } - } - } - }, + Some(file) => Some(option_read_inner(file, buf).await), None => None, } } +async fn option_read_inner(file: &AsyncFd<File>, buf: &mut [u8]) -> io::Result<usize> { + loop { + let mut guard = file.readable().await?; + + match guard.try_io(|inner| inner.get_ref().read(buf)) { + Ok(result) => return result, + Err(_would_block) => continue, + } + } +} + +async fn async_write_all(file: &AsyncFd<File>, buf: &[u8]) -> io::Result<()> { + loop { + let mut guard = file.writable().await?; + + match guard.try_io(|inner| inner.get_ref().write_all(buf)) { + Ok(result) => return result, + Err(_would_block) => continue, + } + } +} + /// Creates a link-local IPv6 address from a 64-bit interface identifier. fn ll(ifid: u64) -> Ipv6Addr { ((0xfe80 << 112) | ifid as u128).into() |