aboutsummaryrefslogtreecommitdiff
path: root/hbak/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'hbak/src/main.rs')
-rw-r--r--hbak/src/main.rs83
1 files changed, 81 insertions, 2 deletions
diff --git a/hbak/src/main.rs b/hbak/src/main.rs
index f097ef9..5cb4761 100644
--- a/hbak/src/main.rs
+++ b/hbak/src/main.rs
@@ -4,13 +4,15 @@ use error::*;
use hbak_common::config::{NodeConfig, RemoteNode, RemoteNodeAuth};
use hbak_common::conn::{AuthConn, DEFAULT_PORT};
use hbak_common::message::SyncInfo;
-use hbak_common::proto::{LocalNode, Node, Snapshot, Volume};
+use hbak_common::proto::{LatestSnapshots, LocalNode, Node, Snapshot, Volume};
use hbak_common::system;
use hbak_common::{LocalNodeError, RemoteError};
use std::collections::HashMap;
use std::fs::{self, File};
+use std::io::Empty;
use std::net::SocketAddr;
+use std::sync::Mutex;
use clap::{Parser, Subcommand};
@@ -404,5 +406,82 @@ fn sync(
}
fn restore(local_node: &LocalNode, address: &str, subvols: &[String]) -> Result<()> {
- todo!()
+ let address = match address.parse() {
+ Ok(address) => address,
+ Err(_) => SocketAddr::new(address.parse()?, DEFAULT_PORT),
+ };
+
+ let auth_conn = AuthConn::new(&address)?;
+ let stream_conn = auth_conn.secure_stream(
+ local_node.name().to_string(),
+ address.to_string(),
+ &local_node.config().passphrase,
+ )?;
+
+ println!("Authentication to {} successful", address);
+
+ let mut local_sync_info = SyncInfo {
+ volumes: HashMap::new(),
+ };
+
+ for subvol in subvols {
+ local_sync_info.volumes.insert(
+ Volume::new_local(subvol.to_string())?,
+ LatestSnapshots::none(),
+ );
+ }
+
+ let (stream_conn, _) = stream_conn.meta_sync(local_sync_info)?;
+
+ let children = Mutex::new(HashMap::new());
+
+ let rx_setup = |snapshot: &Snapshot| {
+ if !subvols
+ .iter()
+ .any(|subvol| snapshot.subvol() == subvol && snapshot.node_name() == local_node.name())
+ {
+ return Err(RemoteError::AccessDenied);
+ }
+
+ if snapshot.snapshot_path().exists() {
+ return Err(RemoteError::Immutable);
+ }
+
+ let (child, w) = local_node.recover().map_err(|_| RemoteError::RxError)?;
+ children.lock().unwrap().insert(snapshot.clone(), child);
+
+ println!("Receiving {} from {}", snapshot, address);
+
+ Ok(w)
+ };
+
+ let rx_finish = |snapshot: Snapshot| {
+ println!("Received {} from {}", snapshot, address);
+
+ let mut child = children
+ .lock()
+ .unwrap()
+ .remove(&snapshot)
+ .ok_or(RemoteError::NotStreaming)?;
+
+ if child.wait().map_err(|_| RemoteError::RxError)?.success() {
+ Ok(())
+ } else {
+ Err(RemoteError::RxError)
+ }
+ };
+
+ match stream_conn.data_sync(Vec::<(Empty, Snapshot)>::default(), rx_setup, rx_finish) {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ for (snapshot, child) in children.lock().unwrap().iter_mut() {
+ match child.kill() {
+ Ok(_) => {}
+ Err(e) => eprintln!("Cannot kill failed receiver for {}: {}", snapshot, e),
+ }
+ }
+
+ Err(e.into())
+ }
+ }
}