diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 150 |
1 files changed, 129 insertions, 21 deletions
diff --git a/src/main.rs b/src/main.rs index 1222b55..be11646 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,19 @@ use std::array; use std::collections::HashMap; use std::fs; -use std::io::{self, Read}; +use std::io; use std::net::SocketAddr; -use std::os::fd::{AsRawFd, RawFd}; -use std::sync::{mpsc, Arc}; -use std::thread; -use std::time::{Duration, Instant}; +use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::Duration; use async_trait::async_trait; -use pcap::Capture; +use byteorder::LittleEndian; +use pcap::{Capture, Device}; +use pcap_file_tokio::pcap::{PcapHeader, PcapPacket}; +use pcap_file_tokio::{Endianness, TsResolution}; use ringbuf::{HeapRb, Rb}; -use rsdsl_netlinklib::blocking::Connection; use russh::server::{Auth, Handle, Msg, Session}; use russh::{Channel, ChannelId, CryptoVec, MethodSet}; use russh_keys::key::KeyPair; @@ -23,33 +23,25 @@ use thiserror::Error; enum Error { #[error("io error: {0}")] Io(#[from] io::Error), - #[error("can't receive from mpsc channel: {0}")] - MpscRecv(#[from] mpsc::RecvError), - #[error("can't send Vec<u8> to mpsc channel")] - MpscSendU8Vec, #[error("can't convert slice to array: {0}")] ArrayTryFromSlice(#[from] array::TryFromSliceError), - #[error("netlinklib error: {0}")] - Netlinklib(#[from] rsdsl_netlinklib::Error), #[error("pcap error: {0}")] Pcap(#[from] pcap::Error), + #[error("pcap_file_tokio error: {0}")] + PcapFileTokio(#[from] pcap_file_tokio::PcapError), #[error("russh error: {0}")] Russh(#[from] russh::Error), } -impl From<mpsc::SendError<Vec<u8>>> for Error { - fn from(_: mpsc::SendError<Vec<u8>>) -> Self { - Self::MpscSendU8Vec - } -} - type Result<T> = std::result::Result<T, Error>; #[derive(Clone)] struct Server { clients: Arc<Mutex<HashMap<(usize, ChannelId), Handle>>>, id: usize, + + packets: Arc<Mutex<HeapRb<Vec<u8>>>>, } impl russh::server::Server for Server { @@ -71,6 +63,8 @@ impl russh::server::Handler for Server { channel: Channel<Msg>, session: Session, ) -> Result<(Self, bool, Session)> { + println!("[info] [{}] open session", channel.id()); + { let mut clients = self.clients.lock().await; clients.insert((self.id, channel.id()), session.handle()); @@ -83,8 +77,10 @@ impl russh::server::Handler for Server { let correct_password = fs::read("/data/admind.passwd")?; if user == "rustkrazy" && password.as_bytes() == correct_password { + println!("[info] auth ok"); Ok((self, Auth::Accept)) } else { + println!("[warn] auth err"); Ok(( self, Auth::Reject { @@ -93,10 +89,91 @@ impl russh::server::Handler for Server { )) } } + + async fn exec_request( + self, + channel: ChannelId, + _: &[u8], + mut session: Session, + ) -> Result<(Self, Session)> { + println!("[info] [{}] exec", channel); + + let header = PcapHeader { + endianness: Endianness::Little, + ..Default::default() + }; + + let mut buf = Vec::new(); + header.write_to(&mut buf).await?; + + let s = session.handle(); + let _ = s.data(channel, CryptoVec::from(buf)).await; + + { + let packets = self.packets.lock().await; + for packet in packets.iter() { + let _ = s.data(channel, CryptoVec::from(packet.clone())).await; + } + } + + Ok((self, session)) + } + + async fn channel_close(self, channel: ChannelId, session: Session) -> Result<(Self, Session)> { + println!("[info] [{}] close session", channel); + Ok((self, session)) + } +} + +async fn capture( + dev: Device, + server: Server, + live_tx: mpsc::UnboundedSender<Vec<u8>>, +) -> Result<()> { + let mut cap = Capture::from_device(dev)?.immediate_mode(true).open()?; + loop { + let packet = cap.next_packet()?; + + { + let pcap_packet = PcapPacket::new( + Duration::new( + packet.header.ts.tv_sec as u64, + (packet.header.ts.tv_usec * 1000) as u32, + ), + packet.header.len, + packet.data, + ); + + let mut buf = Vec::new(); + pcap_packet + .write_to::<_, LittleEndian>(&mut buf, TsResolution::MicroSecond, 65535) + .await?; + + let mut packets = server.packets.lock().await; + packets.push_overwrite(buf.clone()); + + let _ = live_tx.send(buf); + } + } +} + +async fn live_push(server: Server, mut live_rx: mpsc::UnboundedReceiver<Vec<u8>>) -> Result<()> { + while let Some(packet) = live_rx.recv().await { + let clients = server.clients.lock().await; + for ((_, channel), session) in clients.iter() { + let _ = session + .data(*channel, CryptoVec::from(packet.clone())) + .await; + } + } + + Ok(()) } #[tokio::main] async fn main() -> Result<()> { + println!("[info] init"); + let config = Arc::new(russh::server::Config { methods: MethodSet::PASSWORD, inactivity_timeout: Some(Duration::from_secs(3600)), @@ -106,11 +183,42 @@ async fn main() -> Result<()> { ..Default::default() }); + let (live_tx, live_rx) = mpsc::unbounded_channel(); + let server = Server { clients: Arc::new(Mutex::new(HashMap::new())), id: 0, + + packets: Arc::new(Mutex::new(HeapRb::new(64000))), }; - russh::server::run(config, "[::]:2222", server).await?; + let devs = ["wlan0"]; + + for dev in devs { + if dev == "any" { + continue; + } + + println!("[info] capture on {}", dev); + + let server2 = server.clone(); + let live_tx2 = live_tx.clone(); + tokio::spawn(async move { + match capture(dev.into(), server2, live_tx2).await { + Ok(_) => {} + Err(e) => println!("[fail] capture on {}: {}", dev, e), + } + }); + } + + let server2 = server.clone(); + tokio::spawn(async move { + match live_push(server2, live_rx).await { + Ok(_) => {} + Err(e) => println!("[fail] live push: {}", e), + } + }); + + russh::server::run(config, "[::]:22", server).await?; Ok(()) } |