Fix race in block resync

This commit is contained in:
Alex Auvolat 2021-10-26 19:13:41 +02:00
parent 6b47c294f5
commit 69b89fb46d
No known key found for this signature in database
GPG Key ID: EDABF9711E244EB1

View File

@ -12,7 +12,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify}; use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::*;
use garage_util::time::*; use garage_util::time::*;
use garage_util::token_bucket::TokenBucket; use garage_util::token_bucket::TokenBucket;
@ -28,7 +28,7 @@ use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files /// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 2; pub const BACKGROUND_WORKERS: u64 = 1;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42); const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60); const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
@ -393,8 +393,7 @@ impl BlockManager {
} }
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
if let Some(first_item) = self.resync_queue.iter().next() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let (time_bytes, hash_bytes) = first_item?;
let time_msec = u64_from_be_bytes(&time_bytes[0..8]); let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
@ -404,9 +403,9 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e); warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
} }
self.resync_queue.remove(&time_bytes)?;
res?; // propagate error to delay main loop res?; // propagate error to delay main loop
} else { } else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! { select! {
_ = delay.fuse() => {}, _ = delay.fuse() => {},
@ -453,7 +452,7 @@ impl BlockManager {
&self.endpoint, &self.endpoint,
*to, *to,
msg.clone(), msg.clone(),
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT), .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
) )
}); });
@ -461,7 +460,7 @@ impl BlockManager {
let mut need_nodes = vec![]; let mut need_nodes = vec![];
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
match needed? { match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => { BlockRpc::NeedBlockReply(needed) => {
if needed { if needed {
need_nodes.push(*node); need_nodes.push(*node);
@ -482,14 +481,14 @@ impl BlockManager {
need_nodes.len() need_nodes.len()
); );
let put_block_message = self.read_block(hash).await?; let put_block_message = self.read_block(hash).await.err_context("PutBlock RPC")?;
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&need_nodes[..], &need_nodes[..],
put_block_message, put_block_message,
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(need_nodes.len()) .with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT), .with_timeout(BLOCK_RW_TIMEOUT),
) )