aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHimbeerserverDE <himbeerserverde@gmail.com>2023-12-05 16:29:45 +0100
committerHimbeerserverDE <himbeerserverde@gmail.com>2023-12-05 16:29:45 +0100
commit5ef20d58ed1c2d2b73889fbc2fa26dc8f45adf2d (patch)
tree808e514982c3fa362ecc64ebf8fa2f4e2aff7a1a
parent2e427ab73fd82cb56111f89ace740934954b3675 (diff)
fix capturing blocking the runtime and channel operations
Packets now appear on the clients in real time.
-rw-r--r--Cargo.lock3
-rw-r--r--Cargo.toml3
-rw-r--r--src/main.rs77
3 files changed, 51 insertions, 32 deletions
diff --git a/Cargo.lock b/Cargo.lock
index afd29ef..3d1ede7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -853,10 +853,12 @@ checksum = "cbaa01d616eb84eb35cd085fdeaa8671dc8d951bdc4a75bfc414466e76b039ce"
dependencies = [
"bitflags 1.3.2",
"errno",
+ "futures",
"libc",
"libloading",
"pkg-config",
"regex",
+ "tokio",
"windows-sys 0.36.1",
]
@@ -1130,6 +1132,7 @@ version = "0.1.0"
dependencies = [
"async-trait",
"byteorder",
+ "futures",
"pcap",
"pcap-file-tokio",
"ringbuf",
diff --git a/Cargo.toml b/Cargo.toml
index d3ae5a9..059eaba 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,8 @@ edition = "2021"
[dependencies]
async-trait = "^0.1"
byteorder = "^1.4.3"
-pcap = "1.1.0"
+futures = { version = "^0.3", default-features = false, features = ["std"] }
+pcap = { version = "1.1.0", features = ["capture-stream"] }
pcap-file-tokio = "0.1.0"
ringbuf = "0.3.3"
russh = "0.40.0"
diff --git a/src/main.rs b/src/main.rs
index 851adef..05a0e66 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,7 +10,8 @@ use tokio::time::Duration;
use async_trait::async_trait;
use byteorder::LittleEndian;
-use pcap::{Capture, Device};
+use futures::stream::TryStreamExt;
+use pcap::{Capture, Device, Packet, PacketCodec};
use pcap_file_tokio::pcap::{PcapHeader, PcapPacket};
use pcap_file_tokio::{Endianness, TsResolution};
use ringbuf::{HeapRb, Rb};
@@ -36,6 +37,26 @@ enum Error {
type Result<T> = std::result::Result<T, Error>;
+#[derive(Debug)]
+struct NullCodec;
+
+impl PacketCodec for NullCodec {
+ type Item = PcapPacket<'static>;
+
+ fn decode(&mut self, packet: Packet<'_>) -> Self::Item {
+ let pcap_packet = PcapPacket::new_owned(
+ Duration::new(
+ packet.header.ts.tv_sec as u64,
+ (packet.header.ts.tv_usec * 1000) as u32,
+ ),
+ packet.header.len,
+ packet.data.to_vec(),
+ );
+
+ pcap_packet
+ }
+}
+
#[derive(Clone)]
struct Server {
clients: Arc<Mutex<HashMap<(usize, ChannelId), Handle>>>,
@@ -145,35 +166,29 @@ impl russh::server::Handler for Server {
}
async fn capture(
- dev: Device,
+ device: Device,
server: Server,
live_tx: mpsc::UnboundedSender<Vec<u8>>,
) -> Result<()> {
- let mut cap = Capture::from_device(dev)?.immediate_mode(true).open()?;
- loop {
- let packet = cap.next_packet()?;
+ let mut packet_stream = Capture::from_device(device)?
+ .immediate_mode(true)
+ .open()?
+ .setnonblock()?
+ .stream(NullCodec)?;
- {
- let pcap_packet = PcapPacket::new(
- Duration::new(
- packet.header.ts.tv_sec as u64,
- (packet.header.ts.tv_usec * 1000) as u32,
- ),
- packet.header.len,
- packet.data,
- );
-
- let mut buf = Vec::new();
- pcap_packet
- .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
- .await?;
-
- let mut packets = server.packets.lock().await;
- packets.push_overwrite(buf.clone());
-
- let _ = live_tx.send(buf);
- }
+ while let Some(packet) = packet_stream.try_next().await? {
+ let mut buf = Vec::new();
+ packet
+ .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
+ .await?;
+
+ let mut packets = server.packets.lock().await;
+ packets.push_overwrite(buf.clone());
+
+ let _ = live_tx.send(buf);
}
+
+ Ok(())
}
async fn live_push(server: Server, mut live_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> {
@@ -211,21 +226,21 @@ async fn main() -> Result<()> {
packets: Arc::new(Mutex::new(HeapRb::new(64000))),
};
- let devs = ["wlan0"];
+ let devices = ["wlan0"];
- for dev in devs {
- if dev == "any" {
+ for device in devices {
+ if device == "any" {
continue;
}
- println!("[info] capture on {}", dev);
+ println!("[info] capture on {}", device);
let server2 = server.clone();
let live_tx2 = live_tx.clone();
tokio::spawn(async move {
- match capture(dev.into(), server2, live_tx2).await {
+ match capture(device.into(), server2, live_tx2).await {
Ok(_) => {}
- Err(e) => println!("[fail] capture on {}: {}", dev, e),
+ Err(e) => println!("[fail] capture on {}: {}", device, e),
}
});
}