diff options
author | HimbeerserverDE <himbeerserverde@gmail.com> | 2023-12-05 16:29:45 +0100 |
---|---|---|
committer | HimbeerserverDE <himbeerserverde@gmail.com> | 2023-12-05 16:29:45 +0100 |
commit | 5ef20d58ed1c2d2b73889fbc2fa26dc8f45adf2d (patch) | |
tree | 808e514982c3fa362ecc64ebf8fa2f4e2aff7a1a | |
parent | 2e427ab73fd82cb56111f89ace740934954b3675 (diff) |
fix capturing blocking the runtime and channel operations
Packets now appear on the clients in real time.
-rw-r--r-- | Cargo.lock | 3 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/main.rs | 77 |
3 files changed, 51 insertions, 32 deletions
@@ -853,10 +853,12 @@ checksum = "cbaa01d616eb84eb35cd085fdeaa8671dc8d951bdc4a75bfc414466e76b039ce" dependencies = [ "bitflags 1.3.2", "errno", + "futures", "libc", "libloading", "pkg-config", "regex", + "tokio", "windows-sys 0.36.1", ] @@ -1130,6 +1132,7 @@ version = "0.1.0" dependencies = [ "async-trait", "byteorder", + "futures", "pcap", "pcap-file-tokio", "ringbuf", @@ -8,7 +8,8 @@ edition = "2021" [dependencies] async-trait = "^0.1" byteorder = "^1.4.3" -pcap = "1.1.0" +futures = { version = "^0.3", default-features = false, features = ["std"] } +pcap = { version = "1.1.0", features = ["capture-stream"] } pcap-file-tokio = "0.1.0" ringbuf = "0.3.3" russh = "0.40.0" diff --git a/src/main.rs b/src/main.rs index 851adef..05a0e66 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,8 @@ use tokio::time::Duration; use async_trait::async_trait; use byteorder::LittleEndian; -use pcap::{Capture, Device}; +use futures::stream::TryStreamExt; +use pcap::{Capture, Device, Packet, PacketCodec}; use pcap_file_tokio::pcap::{PcapHeader, PcapPacket}; use pcap_file_tokio::{Endianness, TsResolution}; use ringbuf::{HeapRb, Rb}; @@ -36,6 +37,26 @@ enum Error { type Result<T> = std::result::Result<T, Error>; +#[derive(Debug)] +struct NullCodec; + +impl PacketCodec for NullCodec { + type Item = PcapPacket<'static>; + + fn decode(&mut self, packet: Packet<'_>) -> Self::Item { + let pcap_packet = PcapPacket::new_owned( + Duration::new( + packet.header.ts.tv_sec as u64, + (packet.header.ts.tv_usec * 1000) as u32, + ), + packet.header.len, + packet.data.to_vec(), + ); + + pcap_packet + } +} + #[derive(Clone)] struct Server { clients: Arc<Mutex<HashMap<(usize, ChannelId), Handle>>>, @@ -145,35 +166,29 @@ impl russh::server::Handler for Server { } async fn capture( - dev: Device, + device: Device, server: Server, live_tx: mpsc::UnboundedSender<Vec<u8>>, ) -> Result<()> { - let mut cap = Capture::from_device(dev)?.immediate_mode(true).open()?; - loop { - let packet = cap.next_packet()?; + let mut packet_stream = Capture::from_device(device)? + .immediate_mode(true) + .open()? + .setnonblock()? + .stream(NullCodec)?; - { - let pcap_packet = PcapPacket::new( - Duration::new( - packet.header.ts.tv_sec as u64, - (packet.header.ts.tv_usec * 1000) as u32, - ), - packet.header.len, - packet.data, - ); - - let mut buf = Vec::new(); - pcap_packet - .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535) - .await?; - - let mut packets = server.packets.lock().await; - packets.push_overwrite(buf.clone()); - - let _ = live_tx.send(buf); - } + while let Some(packet) = packet_stream.try_next().await? { + let mut buf = Vec::new(); + packet + .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535) + .await?; + + let mut packets = server.packets.lock().await; + packets.push_overwrite(buf.clone()); + + let _ = live_tx.send(buf); } + + Ok(()) } async fn live_push(server: Server, mut live_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> { @@ -211,21 +226,21 @@ async fn main() -> Result<()> { packets: Arc::new(Mutex::new(HeapRb::new(64000))), }; - let devs = ["wlan0"]; + let devices = ["wlan0"]; - for dev in devs { - if dev == "any" { + for device in devices { + if device == "any" { continue; } - println!("[info] capture on {}", dev); + println!("[info] capture on {}", device); let server2 = server.clone(); let live_tx2 = live_tx.clone(); tokio::spawn(async move { - match capture(dev.into(), server2, live_tx2).await { + match capture(device.into(), server2, live_tx2).await { Ok(_) => {} - Err(e) => println!("[fail] capture on {}: {}", dev, e), + Err(e) => println!("[fail] capture on {}: {}", device, e), } }); } |