diff options
Diffstat (limited to 'hbak/src')
-rw-r--r-- | hbak/src/main.rs | 83 |
1 files changed, 81 insertions, 2 deletions
diff --git a/hbak/src/main.rs b/hbak/src/main.rs index f097ef9..5cb4761 100644 --- a/hbak/src/main.rs +++ b/hbak/src/main.rs @@ -4,13 +4,15 @@ use error::*; use hbak_common::config::{NodeConfig, RemoteNode, RemoteNodeAuth}; use hbak_common::conn::{AuthConn, DEFAULT_PORT}; use hbak_common::message::SyncInfo; -use hbak_common::proto::{LocalNode, Node, Snapshot, Volume}; +use hbak_common::proto::{LatestSnapshots, LocalNode, Node, Snapshot, Volume}; use hbak_common::system; use hbak_common::{LocalNodeError, RemoteError}; use std::collections::HashMap; use std::fs::{self, File}; +use std::io::Empty; use std::net::SocketAddr; +use std::sync::Mutex; use clap::{Parser, Subcommand}; @@ -404,5 +406,82 @@ fn sync( } fn restore(local_node: &LocalNode, address: &str, subvols: &[String]) -> Result<()> { - todo!() + let address = match address.parse() { + Ok(address) => address, + Err(_) => SocketAddr::new(address.parse()?, DEFAULT_PORT), + }; + + let auth_conn = AuthConn::new(&address)?; + let stream_conn = auth_conn.secure_stream( + local_node.name().to_string(), + address.to_string(), + &local_node.config().passphrase, + )?; + + println!("Authentication to {} successful", address); + + let mut local_sync_info = SyncInfo { + volumes: HashMap::new(), + }; + + for subvol in subvols { + local_sync_info.volumes.insert( + Volume::new_local(subvol.to_string())?, + LatestSnapshots::none(), + ); + } + + let (stream_conn, _) = stream_conn.meta_sync(local_sync_info)?; + + let children = Mutex::new(HashMap::new()); + + let rx_setup = |snapshot: &Snapshot| { + if !subvols + .iter() + .any(|subvol| snapshot.subvol() == subvol && snapshot.node_name() == local_node.name()) + { + return Err(RemoteError::AccessDenied); + } + + if snapshot.snapshot_path().exists() { + return Err(RemoteError::Immutable); + } + + let (child, w) = local_node.recover().map_err(|_| RemoteError::RxError)?; + children.lock().unwrap().insert(snapshot.clone(), child); + + println!("Receiving {} from {}", snapshot, address); + + Ok(w) + }; + + let rx_finish = |snapshot: Snapshot| { + println!("Received {} from {}", snapshot, address); + + let mut child = children + .lock() + .unwrap() + .remove(&snapshot) + .ok_or(RemoteError::NotStreaming)?; + + if child.wait().map_err(|_| RemoteError::RxError)?.success() { + Ok(()) + } else { + Err(RemoteError::RxError) + } + }; + + match stream_conn.data_sync(Vec::<(Empty, Snapshot)>::default(), rx_setup, rx_finish) { + Ok(_) => Ok(()), + Err(e) => { + for (snapshot, child) in children.lock().unwrap().iter_mut() { + match child.kill() { + Ok(_) => {} + Err(e) => eprintln!("Cannot kill failed receiver for {}: {}", snapshot, e), + } + } + + Err(e.into()) + } + } } |