block manager: refactoring & increase max worker count to 8
This commit is contained in:
parent
3a74844df0
commit
a44f486931
@ -279,21 +279,21 @@ impl BlockManager {
|
|||||||
let res = match res {
|
let res = match res {
|
||||||
Ok(res) => res,
|
Ok(res) => res,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Node {:?} returned error: {}", node, e);
|
debug!("Get block {:?}: node {:?} returned error: {}", hash, node, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let (header, stream) = match res.into_parts() {
|
let (header, stream) = match res.into_parts() {
|
||||||
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
|
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
|
||||||
_ => {
|
_ => {
|
||||||
debug!("Node {:?} returned a malformed response", node);
|
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match f(header, stream).await {
|
match f(header, stream).await {
|
||||||
Ok(ret) => return Ok(ret),
|
Ok(ret) => return Ok(ret),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Error reading stream from node {:?}: {}", node, e);
|
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -302,15 +302,14 @@ impl BlockManager {
|
|||||||
// TODO: keep first request running when initiating a new one and take the
|
// TODO: keep first request running when initiating a new one and take the
|
||||||
// one that finishes earlier
|
// one that finishes earlier
|
||||||
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
|
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
|
||||||
debug!("Node {:?} didn't return block in time, trying next.", node);
|
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(Error::Message(format!(
|
let msg = format!("Get block {:?}: no node returned a valid block", hash);
|
||||||
"Unable to read block {:?}: no node returned a valid block",
|
debug!("{}", msg);
|
||||||
hash
|
Err(Error::Message(msg))
|
||||||
)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Public interface ----
|
// ---- Public interface ----
|
||||||
@ -666,7 +665,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
|
|||||||
BlockRpc::PutBlock { hash, header } => Resp::new(
|
BlockRpc::PutBlock { hash, header } => Resp::new(
|
||||||
self.handle_put_block(*hash, *header, message.take_stream())
|
self.handle_put_block(*hash, *header, message.take_stream())
|
||||||
.await
|
.await
|
||||||
.map(|_| BlockRpc::Ok),
|
.map(|()| BlockRpc::Ok),
|
||||||
),
|
),
|
||||||
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
|
BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
|
||||||
BlockRpc::NeedBlockQuery(h) => {
|
BlockRpc::NeedBlockQuery(h) => {
|
||||||
@ -687,15 +686,14 @@ impl BlockManagerLocked {
|
|||||||
let compressed = data.is_compressed();
|
let compressed = data.is_compressed();
|
||||||
let data = data.inner_buffer();
|
let data = data.inner_buffer();
|
||||||
|
|
||||||
let mut tgt_path = mgr.data_layout.primary_block_dir(hash);
|
let directory = mgr.data_layout.primary_block_dir(hash);
|
||||||
let directory = tgt_path.clone();
|
|
||||||
|
let mut tgt_path = directory.clone();
|
||||||
tgt_path.push(hex::encode(hash));
|
tgt_path.push(hex::encode(hash));
|
||||||
if compressed {
|
if compressed {
|
||||||
tgt_path.set_extension("zst");
|
tgt_path.set_extension("zst");
|
||||||
}
|
}
|
||||||
|
|
||||||
fs::create_dir_all(&directory).await?;
|
|
||||||
|
|
||||||
let to_delete = match (mgr.find_block(hash).await, compressed) {
|
let to_delete = match (mgr.find_block(hash).await, compressed) {
|
||||||
// If the block is stored in the wrong directory,
|
// If the block is stored in the wrong directory,
|
||||||
// write it again at the correct path and delete the old path
|
// write it again at the correct path and delete the old path
|
||||||
@ -723,6 +721,8 @@ impl BlockManagerLocked {
|
|||||||
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
|
let tmp_extension = format!("tmp{}", hex::encode(thread_rng().gen::<[u8; 4]>()));
|
||||||
path_tmp.set_extension(tmp_extension);
|
path_tmp.set_extension(tmp_extension);
|
||||||
|
|
||||||
|
fs::create_dir_all(&directory).await?;
|
||||||
|
|
||||||
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
|
let mut delete_on_drop = DeleteOnDrop(Some(path_tmp.clone()));
|
||||||
|
|
||||||
let mut f = fs::File::create(&path_tmp).await?;
|
let mut f = fs::File::create(&path_tmp).await?;
|
||||||
|
@ -41,7 +41,7 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
|
|||||||
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
|
pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
|
||||||
|
|
||||||
// No more than 4 resync workers can be running in the system
|
// No more than 4 resync workers can be running in the system
|
||||||
pub(crate) const MAX_RESYNC_WORKERS: usize = 4;
|
pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
|
||||||
// Resync tranquility is initially set to 2, but can be changed in the CLI
|
// Resync tranquility is initially set to 2, but can be changed in the CLI
|
||||||
// and the updated version is persisted over Garage restarts
|
// and the updated version is persisted over Garage restarts
|
||||||
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
|
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
|
||||||
|
Loading…
Reference in New Issue
Block a user