aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs150
1 files changed, 129 insertions, 21 deletions
diff --git a/src/main.rs b/src/main.rs
index 1222b55..be11646 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,19 +1,19 @@
use std::array;
use std::collections::HashMap;
use std::fs;
-use std::io::{self, Read};
+use std::io;
use std::net::SocketAddr;
-use std::os::fd::{AsRawFd, RawFd};
-use std::sync::{mpsc, Arc};
-use std::thread;
-use std::time::{Duration, Instant};
+use std::sync::Arc;
-use tokio::sync::Mutex;
+use tokio::sync::{mpsc, Mutex};
+use tokio::time::Duration;
use async_trait::async_trait;
-use pcap::Capture;
+use byteorder::LittleEndian;
+use pcap::{Capture, Device};
+use pcap_file_tokio::pcap::{PcapHeader, PcapPacket};
+use pcap_file_tokio::{Endianness, TsResolution};
use ringbuf::{HeapRb, Rb};
-use rsdsl_netlinklib::blocking::Connection;
use russh::server::{Auth, Handle, Msg, Session};
use russh::{Channel, ChannelId, CryptoVec, MethodSet};
use russh_keys::key::KeyPair;
@@ -23,33 +23,25 @@ use thiserror::Error;
enum Error {
#[error("io error: {0}")]
Io(#[from] io::Error),
- #[error("can't receive from mpsc channel: {0}")]
- MpscRecv(#[from] mpsc::RecvError),
- #[error("can't send Vec<u8> to mpsc channel")]
- MpscSendU8Vec,
#[error("can't convert slice to array: {0}")]
ArrayTryFromSlice(#[from] array::TryFromSliceError),
- #[error("netlinklib error: {0}")]
- Netlinklib(#[from] rsdsl_netlinklib::Error),
#[error("pcap error: {0}")]
Pcap(#[from] pcap::Error),
+ #[error("pcap_file_tokio error: {0}")]
+ PcapFileTokio(#[from] pcap_file_tokio::PcapError),
#[error("russh error: {0}")]
Russh(#[from] russh::Error),
}
-impl From<mpsc::SendError<Vec<u8>>> for Error {
- fn from(_: mpsc::SendError<Vec<u8>>) -> Self {
- Self::MpscSendU8Vec
- }
-}
-
type Result<T> = std::result::Result<T, Error>;
#[derive(Clone)]
struct Server {
clients: Arc<Mutex<HashMap<(usize, ChannelId), Handle>>>,
id: usize,
+
+ packets: Arc<Mutex<HeapRb<Vec<u8>>>>,
}
impl russh::server::Server for Server {
@@ -71,6 +63,8 @@ impl russh::server::Handler for Server {
channel: Channel<Msg>,
session: Session,
) -> Result<(Self, bool, Session)> {
+ println!("[info] [{}] open session", channel.id());
+
{
let mut clients = self.clients.lock().await;
clients.insert((self.id, channel.id()), session.handle());
@@ -83,8 +77,10 @@ impl russh::server::Handler for Server {
let correct_password = fs::read("/data/admind.passwd")?;
if user == "rustkrazy" && password.as_bytes() == correct_password {
+ println!("[info] auth ok");
Ok((self, Auth::Accept))
} else {
+ println!("[warn] auth err");
Ok((
self,
Auth::Reject {
@@ -93,10 +89,91 @@ impl russh::server::Handler for Server {
))
}
}
+
+ async fn exec_request(
+ self,
+ channel: ChannelId,
+ _: &[u8],
+ mut session: Session,
+ ) -> Result<(Self, Session)> {
+ println!("[info] [{}] exec", channel);
+
+ let header = PcapHeader {
+ endianness: Endianness::Little,
+ ..Default::default()
+ };
+
+ let mut buf = Vec::new();
+ header.write_to(&mut buf).await?;
+
+ let s = session.handle();
+ let _ = s.data(channel, CryptoVec::from(buf)).await;
+
+ {
+ let packets = self.packets.lock().await;
+ for packet in packets.iter() {
+ let _ = s.data(channel, CryptoVec::from(packet.clone())).await;
+ }
+ }
+
+ Ok((self, session))
+ }
+
+ async fn channel_close(self, channel: ChannelId, session: Session) -> Result<(Self, Session)> {
+ println!("[info] [{}] close session", channel);
+ Ok((self, session))
+ }
+}
+
+async fn capture(
+ dev: Device,
+ server: Server,
+ live_tx: mpsc::UnboundedSender<Vec<u8>>,
+) -> Result<()> {
+ let mut cap = Capture::from_device(dev)?.immediate_mode(true).open()?;
+ loop {
+ let packet = cap.next_packet()?;
+
+ {
+ let pcap_packet = PcapPacket::new(
+ Duration::new(
+ packet.header.ts.tv_sec as u64,
+ (packet.header.ts.tv_usec * 1000) as u32,
+ ),
+ packet.header.len,
+ packet.data,
+ );
+
+ let mut buf = Vec::new();
+ pcap_packet
+ .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535)
+ .await?;
+
+ let mut packets = server.packets.lock().await;
+ packets.push_overwrite(buf.clone());
+
+ let _ = live_tx.send(buf);
+ }
+ }
+}
+
+async fn live_push(server: Server, mut live_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> {
+ while let Some(packet) = live_rx.recv().await {
+ let clients = server.clients.lock().await;
+ for ((_, channel), session) in clients.iter() {
+ let _ = session
+ .data(*channel, CryptoVec::from(packet.clone()))
+ .await;
+ }
+ }
+
+ Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
+ println!("[info] init");
+
let config = Arc::new(russh::server::Config {
methods: MethodSet::PASSWORD,
inactivity_timeout: Some(Duration::from_secs(3600)),
@@ -106,11 +183,42 @@ async fn main() -> Result<()> {
..Default::default()
});
+ let (live_tx, live_rx) = mpsc::unbounded_channel();
+
let server = Server {
clients: Arc::new(Mutex::new(HashMap::new())),
id: 0,
+
+ packets: Arc::new(Mutex::new(HeapRb::new(64000))),
};
- russh::server::run(config, "[::]:2222", server).await?;
+ let devs = ["wlan0"];
+
+ for dev in devs {
+ if dev == "any" {
+ continue;
+ }
+
+ println!("[info] capture on {}", dev);
+
+ let server2 = server.clone();
+ let live_tx2 = live_tx.clone();
+ tokio::spawn(async move {
+ match capture(dev.into(), server2, live_tx2).await {
+ Ok(_) => {}
+ Err(e) => println!("[fail] capture on {}: {}", dev, e),
+ }
+ });
+ }
+
+ let server2 = server.clone();
+ tokio::spawn(async move {
+ match live_push(server2, live_rx).await {
+ Ok(_) => {}
+ Err(e) => println!("[fail] live push: {}", e),
+ }
+ });
+
+ russh::server::run(config, "[::]:22", server).await?;
Ok(())
}