aboutsummaryrefslogtreecommitdiff
path: root/hbak_common
diff options
context:
space:
mode:
authorHimbeer <himbeerserverde@gmail.com>2024-02-21 19:35:03 +0100
committerHimbeer <himbeerserverde@gmail.com>2024-02-21 19:35:03 +0100
commit8b95f9f954faf5b26e0f204a8460ffd5654603e6 (patch)
tree2fb74217a0f5a6aad69974fa75e0003a3747898f /hbak_common
parent7f79a1541a69f63f96eb242cd73ca4fa94225369 (diff)
StreamConn::data_sync: return rx errors even if tx is still running or deadlocked
Diffstat (limited to 'hbak_common')
-rw-r--r--hbak_common/src/conn.rs36
1 files changed, 25 insertions, 11 deletions
diff --git a/hbak_common/src/conn.rs b/hbak_common/src/conn.rs
index a8ebe3f..a73e245 100644
--- a/hbak_common/src/conn.rs
+++ b/hbak_common/src/conn.rs
@@ -459,7 +459,7 @@ impl StreamConn<Active> {
let local_done = Mutex::new(false);
thread::scope(|s| {
- let tx = s.spawn(|| -> Result<(), NetworkError> {
+ let mut tx = Some(s.spawn(|| -> Result<(), NetworkError> {
for (mut r, snapshot) in tx.into_iter() {
stream_conn
.read()
@@ -475,8 +475,8 @@ impl StreamConn<Active> {
}
Ok(())
- });
- let rx = s.spawn(|| -> Result<(), NetworkError> {
+ }));
+ let mut rx = Some(s.spawn(|| -> Result<(), NetworkError> {
let mut remote_done = false;
while !*local_done.lock().unwrap() || !remote_done {
@@ -507,17 +507,31 @@ impl StreamConn<Active> {
}
Ok(())
- });
+ }));
- tx.join().unwrap()?;
+ let mut remote_done = false;
+ loop {
+ let mut local_done = local_done.lock().unwrap();
+ if tx.as_ref().expect("tx thread not joined").is_finished() && !*local_done {
+ tx.take().expect("tx thread not joined").join().unwrap()?;
+ *local_done = true;
- *local_done.lock().unwrap() = true;
- stream_conn
- .read()
- .unwrap()
- .send_message(&StreamMessage::Done)?;
+ stream_conn
+ .read()
+ .unwrap()
+ .send_message(&StreamMessage::Done)?;
+ }
+ if rx.as_ref().expect("rx thread not joined").is_finished() && !remote_done {
+ rx.take().expect("rx thread not joined").join().unwrap()?;
+ remote_done = true;
+ }
+
+ if *local_done && remote_done {
+ break;
+ }
- rx.join().unwrap()?;
+ thread::sleep(READ_TIMEOUT);
+ }
Ok(())
})