aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/main.rs93
3 files changed, 50 insertions, 45 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8a74243..49c7d80 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1220,7 +1220,6 @@ version = "0.1.0"
dependencies = [
"async-trait",
"byteorder",
- "futures",
"pcap",
"pcap-file-tokio",
"ringbuf",
diff --git a/Cargo.toml b/Cargo.toml
index ea1154d..41693d6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,6 @@ edition = "2021"
[dependencies]
async-trait = "^0.1"
byteorder = "^1.4.3"
-futures = { version = "^0.3", default-features = false, features = ["std"] }
pcap = { version = "1.1.0", features = ["capture-stream"] }
pcap-file-tokio = "0.1.0"
ringbuf = "0.3.3"
diff --git a/src/main.rs b/src/main.rs
index a8ac6cc..1e1e0a2 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,7 +9,6 @@ use tokio::time::Duration;
use async_trait::async_trait;
use byteorder::LittleEndian;
-use futures::stream::TryStreamExt;
use pcap::{Capture, Device, Packet, PacketCodec};
use pcap_file_tokio::pcap::{PcapHeader, PcapPacket};
use pcap_file_tokio::{Endianness, TsResolution};
@@ -192,6 +191,8 @@ async fn capture(
server: Server,
live_tx: mpsc::UnboundedSender<Vec<u8>>,
) -> Result<()> {
+ let mut null_codec = NullCodec;
+
let is_ppp = device.name.starts_with("ppp");
let mut capture = Capture::from_device(device)?
@@ -201,54 +202,60 @@ async fn capture(
capture.filter(FILTER, true)?;
- let mut packet_stream = capture.stream(NullCodec)?;
-
- while let Some(mut packet) = packet_stream.try_next().await? {
- // Format an Ethernet pseudo-header to make wireshark detect the EtherType correctly.
- if is_ppp {
- let data = packet.data.to_mut();
+ loop {
+ match capture.next_packet() {
+ Ok(packet) => {
+ let mut packet = null_codec.decode(packet);
+
+ // Format an Ethernet pseudo-header to make wireshark detect the EtherType correctly.
+ if is_ppp {
+ let data = packet.data.to_mut();
+
+ // Remove invalid 0x0000 where EtherType is supposed to be.
+ // The data that is shifted in its place is the correct EtherType.
+ data.remove(13);
+ data.remove(12);
+
+ match u16::from_be_bytes(data[0..2].try_into()?) {
+ // Outgoing packet:
+ // sll_pkttype == PACKET_OUTGOING
+ 4 => {
+ // Destination: CF:72:73:00:00:01 (Access Concentrator)
+ data[0..6].copy_from_slice(PPP_MAC_AC);
+
+ // Source: CF:72:73:00:00:02 (Host)
+ data[6..12].copy_from_slice(PPP_MAC_HOST);
+ }
+ // Incoming (unicast) packet:
+ // sll_pkttype == PACKET_HOST
+ 0 => {
+ // Destination: CF:72:73:00:00:02 (Host)
+ data[0..6].copy_from_slice(PPP_MAC_HOST);
+
+ // Source: CF:72:73:00:00:01 (Access Concentrator)
+ data[6..12].copy_from_slice(PPP_MAC_AC);
+ }
+ // Unknown or invalid packet type, make it available in wireshark.
+ _ => {}
+ }
+ }
- // Remove invalid 0x0000 where EtherType is supposed to be.
- // The data that is shifted in its place is the correct EtherType.
- data.remove(13);
- data.remove(12);
+ let mut buf = Vec::new();
+ packet
+ .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
+ .await?;
- match u16::from_be_bytes(data[0..2].try_into()?) {
- // Outgoing packet:
- // sll_pkttype == PACKET_OUTGOING
- 4 => {
- // Destination: CF:72:73:00:00:01 (Access Concentrator)
- data[0..6].copy_from_slice(PPP_MAC_AC);
+ let mut packets = server.packets.lock().await;
+ packets.push_overwrite(buf.clone());
- // Source: CF:72:73:00:00:02 (Host)
- data[6..12].copy_from_slice(PPP_MAC_HOST);
- }
- // Incoming (unicast) packet:
- // sll_pkttype == PACKET_HOST
- 0 => {
- // Destination: CF:72:73:00:00:02 (Host)
- data[0..6].copy_from_slice(PPP_MAC_HOST);
-
- // Source: CF:72:73:00:00:01 (Access Concentrator)
- data[6..12].copy_from_slice(PPP_MAC_AC);
- }
- // Unknown or invalid packet type, make it available in wireshark.
- _ => {}
+ let _ = live_tx.send(buf);
+ }
+ Err(pcap::Error::IoError(io::ErrorKind::WouldBlock) | pcap::Error::TimeoutExpired) => {
+ tokio::time::sleep(Duration::from_millis(200)).await
}
+ Err(e) => return Err(e.into()),
}
-
- let mut buf = Vec::new();
- packet
- .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
- .await?;
-
- let mut packets = server.packets.lock().await;
- packets.push_overwrite(buf.clone());
-
- let _ = live_tx.send(buf);
}
-
- Ok(())
}
async fn live_push(server: Server, live_rx: &mut mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> {