aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHimbeerserverDE <himbeerserverde@gmail.com>2023-12-05 16:29:45 +0100
committerHimbeerserverDE <himbeerserverde@gmail.com>2023-12-05 16:29:45 +0100
commit5ef20d58ed1c2d2b73889fbc2fa26dc8f45adf2d (patch)
tree808e514982c3fa362ecc64ebf8fa2f4e2aff7a1a /src
parent2e427ab73fd82cb56111f89ace740934954b3675 (diff)
fix capturing blocking the runtime and channel operations
Packets now appear on the clients in real time.
Diffstat (limited to 'src')
-rw-r--r--src/main.rs77
1 files changed, 46 insertions, 31 deletions
diff --git a/src/main.rs b/src/main.rs
index 851adef..05a0e66 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,7 +10,8 @@ use tokio::time::Duration;
use async_trait::async_trait;
use byteorder::LittleEndian;
-use pcap::{Capture, Device};
+use futures::stream::TryStreamExt;
+use pcap::{Capture, Device, Packet, PacketCodec};
use pcap_file_tokio::pcap::{PcapHeader, PcapPacket};
use pcap_file_tokio::{Endianness, TsResolution};
use ringbuf::{HeapRb, Rb};
@@ -36,6 +37,26 @@ enum Error {
type Result<T> = std::result::Result<T, Error>;
+#[derive(Debug)]
+struct NullCodec;
+
+impl PacketCodec for NullCodec {
+ type Item = PcapPacket<'static>;
+
+ fn decode(&mut self, packet: Packet<'_>) -> Self::Item {
+ let pcap_packet = PcapPacket::new_owned(
+ Duration::new(
+ packet.header.ts.tv_sec as u64,
+ (packet.header.ts.tv_usec * 1000) as u32,
+ ),
+ packet.header.len,
+ packet.data.to_vec(),
+ );
+
+ pcap_packet
+ }
+}
+
#[derive(Clone)]
struct Server {
clients: Arc<Mutex<HashMap<(usize, ChannelId), Handle>>>,
@@ -145,35 +166,29 @@ impl russh::server::Handler for Server {
}
async fn capture(
- dev: Device,
+ device: Device,
server: Server,
live_tx: mpsc::UnboundedSender<Vec<u8>>,
) -> Result<()> {
- let mut cap = Capture::from_device(dev)?.immediate_mode(true).open()?;
- loop {
- let packet = cap.next_packet()?;
+ let mut packet_stream = Capture::from_device(device)?
+ .immediate_mode(true)
+ .open()?
+ .setnonblock()?
+ .stream(NullCodec)?;
- {
- let pcap_packet = PcapPacket::new(
- Duration::new(
- packet.header.ts.tv_sec as u64,
- (packet.header.ts.tv_usec * 1000) as u32,
- ),
- packet.header.len,
- packet.data,
- );
-
- let mut buf = Vec::new();
- pcap_packet
- .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
- .await?;
-
- let mut packets = server.packets.lock().await;
- packets.push_overwrite(buf.clone());
-
- let _ = live_tx.send(buf);
- }
+ while let Some(packet) = packet_stream.try_next().await? {
+ let mut buf = Vec::new();
+ packet
+ .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
+ .await?;
+
+ let mut packets = server.packets.lock().await;
+ packets.push_overwrite(buf.clone());
+
+ let _ = live_tx.send(buf);
}
+
+ Ok(())
}
async fn live_push(server: Server, mut live_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> {
@@ -211,21 +226,21 @@ async fn main() -> Result<()> {
packets: Arc::new(Mutex::new(HeapRb::new(64000))),
};
- let devs = ["wlan0"];
+ let devices = ["wlan0"];
- for dev in devs {
- if dev == "any" {
+ for device in devices {
+ if device == "any" {
continue;
}
- println!("[info] capture on {}", dev);
+ println!("[info] capture on {}", device);
let server2 = server.clone();
let live_tx2 = live_tx.clone();
tokio::spawn(async move {
- match capture(dev.into(), server2, live_tx2).await {
+ match capture(device.into(), server2, live_tx2).await {
Ok(_) => {}
- Err(e) => println!("[fail] capture on {}: {}", dev, e),
+ Err(e) => println!("[fail] capture on {}: {}", device, e),
}
});
}