diff options
author | Himbeer <himbeerserverde@gmail.com> | 2024-02-21 19:35:03 +0100 |
---|---|---|
committer | Himbeer <himbeerserverde@gmail.com> | 2024-02-21 19:35:03 +0100 |
commit | 8b95f9f954faf5b26e0f204a8460ffd5654603e6 (patch) | |
tree | 2fb74217a0f5a6aad69974fa75e0003a3747898f /hbak_common | |
parent | 7f79a1541a69f63f96eb242cd73ca4fa94225369 (diff) |
StreamConn::data_sync: return rx errors even if tx is still running or deadlocked
Diffstat (limited to 'hbak_common')
-rw-r--r-- | hbak_common/src/conn.rs | 36 |
1 files changed, 25 insertions, 11 deletions
diff --git a/hbak_common/src/conn.rs b/hbak_common/src/conn.rs index a8ebe3f..a73e245 100644 --- a/hbak_common/src/conn.rs +++ b/hbak_common/src/conn.rs @@ -459,7 +459,7 @@ impl StreamConn<Active> { let local_done = Mutex::new(false); thread::scope(|s| { - let tx = s.spawn(|| -> Result<(), NetworkError> { + let mut tx = Some(s.spawn(|| -> Result<(), NetworkError> { for (mut r, snapshot) in tx.into_iter() { stream_conn .read() @@ -475,8 +475,8 @@ impl StreamConn<Active> { } Ok(()) - }); - let rx = s.spawn(|| -> Result<(), NetworkError> { + })); + let mut rx = Some(s.spawn(|| -> Result<(), NetworkError> { let mut remote_done = false; while !*local_done.lock().unwrap() || !remote_done { @@ -507,17 +507,31 @@ impl StreamConn<Active> { } Ok(()) - }); + })); - tx.join().unwrap()?; + let mut remote_done = false; + loop { + let mut local_done = local_done.lock().unwrap(); + if tx.as_ref().expect("tx thread not joined").is_finished() && !*local_done { + tx.take().expect("tx thread not joined").join().unwrap()?; + *local_done = true; - *local_done.lock().unwrap() = true; - stream_conn - .read() - .unwrap() - .send_message(&StreamMessage::Done)?; + stream_conn + .read() + .unwrap() + .send_message(&StreamMessage::Done)?; + } + if rx.as_ref().expect("rx thread not joined").is_finished() && !remote_done { + rx.take().expect("rx thread not joined").join().unwrap()?; + remote_done = true; + } + + if *local_done && remote_done { + break; + } - rx.join().unwrap()?; + thread::sleep(READ_TIMEOUT); + } Ok(()) }) |