updaet block admin for new multipartupload models

This commit is contained in:
Alex Auvolat 2023-05-03 16:17:40 +02:00
parent 82e75c0e29
commit 87be8eeb93
4 changed files with 109 additions and 52 deletions

View File

@ -34,6 +34,7 @@ impl AdminRpcHandler {
.get_range(&hash, None, None, 10000, Default::default()) .get_range(&hash, None, None, 10000, Default::default())
.await?; .await?;
let mut versions = vec![]; let mut versions = vec![];
let mut uploads = vec![];
for br in block_refs { for br in block_refs {
if let Some(v) = self if let Some(v) = self
.garage .garage
@ -41,6 +42,11 @@ impl AdminRpcHandler {
.get(&br.version, &EmptyKey) .get(&br.version, &EmptyKey)
.await? .await?
{ {
if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
uploads.push(u);
}
}
versions.push(Ok(v)); versions.push(Ok(v));
} else { } else {
versions.push(Err(br.version)); versions.push(Err(br.version));
@ -50,6 +56,7 @@ impl AdminRpcHandler {
hash, hash,
refcount, refcount,
versions, versions,
uploads,
}) })
} }
@ -93,6 +100,7 @@ impl AdminRpcHandler {
} }
let mut obj_dels = 0; let mut obj_dels = 0;
let mut mpu_dels = 0;
let mut ver_dels = 0; let mut ver_dels = 0;
for hash in blocks { for hash in blocks {
@ -105,56 +113,81 @@ impl AdminRpcHandler {
.await?; .await?;
for br in block_refs { for br in block_refs {
let version = match self if let Some(version) = self
.garage .garage
.version_table .version_table
.get(&br.version, &EmptyKey) .get(&br.version, &EmptyKey)
.await? .await?
{ {
Some(v) => v, self.handle_block_purge_version_backlink(&version, &mut obj_dels, &mut mpu_dels).await?;
None => continue,
};
if let Some(object) = self if !version.deleted.get() {
.garage let deleted_version =
.object_table Version::new(version.uuid, version.backlink, true);
.get(&version.bucket_id, &version.key) self.garage.version_table.insert(&deleted_version).await?;
.await? ver_dels += 1;
{ }
let ov = object.versions().iter().rev().find(|v| v.is_complete()); }
if let Some(ov) = ov { }
if ov.uuid == br.version { }
let del_uuid = gen_uuid();
let deleted_object = Object::new(
version.bucket_id,
version.key.clone(),
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(
ObjectVersionData::DeleteMarker,
),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
obj_dels += 1;
}
}
}
if !version.deleted.get() {
let deleted_version =
Version::new(version.uuid, version.bucket_id, version.key.clone(), true);
self.garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
Ok(AdminRpc::Ok(format!( Ok(AdminRpc::Ok(format!(
"{} blocks were purged: {} object deletion markers added, {} versions marked deleted", "Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(), blocks.len(),
ver_dels,
obj_dels, obj_dels,
ver_dels mpu_dels,
))) )))
}
async fn handle_block_purge_version_backlink(&self, version: &Version, obj_dels: &mut usize, mpu_dels: &mut usize) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object{bucket_id, key} => {
(*bucket_id, key.clone(), version.uuid)
}
VersionBacklink::MultipartUpload{upload_id} => {
if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = self
.garage
.object_table
.get(&bucket_id, &key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(
ObjectVersionData::DeleteMarker,
),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
} }
} }

View File

@ -27,6 +27,7 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::migrate::Migrate; use garage_model::migrate::Migrate;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version; use garage_model::s3::version_table::Version;
use crate::cli::*; use crate::cli::*;
@ -66,6 +67,7 @@ pub enum AdminRpc {
hash: Hash, hash: Hash,
refcount: u64, refcount: u64,
versions: Vec<Result<Version, Uuid>>, versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
}, },
} }

View File

@ -215,8 +215,9 @@ pub async fn cmd_admin(
hash, hash,
refcount, refcount,
versions, versions,
uploads,
} => { } => {
print_block_info(hash, refcount, versions); print_block_info(hash, refcount, versions, uploads);
} }
r => { r => {
error!("Unexpected response: {:?}", r); error!("Unexpected response: {:?}", r);

View File

@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
use garage_model::s3::version_table::Version; use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt; use crate::cli::structs::WorkerListOpt;
@ -385,29 +386,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
format_table(table); format_table(table);
} }
pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { pub fn print_block_info(
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
) {
println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount); println!("Refcount: {}", refcount);
println!(); println!();
let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0; let mut nondeleted_count = 0;
for v in versions.iter() { for v in versions.iter() {
match v { match v {
Ok(ver) => { Ok(ver) => {
table.push(format!( match &ver.backlink {
"{:?}\t{:?}\t{}\t{:?}", VersionBacklink::Object { bucket_id, key } => {
ver.uuid, table.push(format!(
ver.bucket_id, "{:?}\t{:?}\t{}\t\t{:?}",
ver.key, ver.uuid,
ver.deleted.get() bucket_id,
)); key,
ver.deleted.get()
));
}
VersionBacklink::MultipartUpload { upload_id } => {
let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
table.push(format!(
"{:?}\t{:?}\t{}\t{:?}\t{:?}",
ver.uuid,
upload.map(|u| u.bucket_id).unwrap_or_default(),
upload.map(|u| u.key.as_str()).unwrap_or_default(),
upload_id,
ver.deleted.get()
));
}
}
if !ver.deleted.get() { if !ver.deleted.get() {
nondeleted_count += 1; nondeleted_count += 1;
} }
} }
Err(vh) => { Err(vh) => {
table.push(format!("{:?}\t\t\tyes", vh)); table.push(format!("{:?}\t\t\t\tyes", vh));
} }
} }
} }