|
|
|
@ -119,6 +119,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( |
|
|
|
|
return Ok((version_uuid, data_md5sum_hex)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The following consists in many steps that can each fail.
|
|
|
|
|
// Keep track that some cleanup will be needed if things fail
|
|
|
|
|
// before everything is finished (cleanup is done using the Drop trait).
|
|
|
|
|
let mut interrupted_cleanup = InterruptedCleanup(Some(( |
|
|
|
|
garage.clone(), |
|
|
|
|
bucket.id, |
|
|
|
|
key.into(), |
|
|
|
|
version_uuid, |
|
|
|
|
version_timestamp, |
|
|
|
|
))); |
|
|
|
|
|
|
|
|
|
// Write version identifier in object table so that we have a trace
|
|
|
|
|
// that we are uploading something
|
|
|
|
|
let mut object_version = ObjectVersion { |
|
|
|
@ -139,44 +150,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( |
|
|
|
|
// Transfer data and verify checksum
|
|
|
|
|
let first_block_hash = async_blake2sum(first_block.clone()).await; |
|
|
|
|
|
|
|
|
|
let tx_result = (|| async { |
|
|
|
|
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( |
|
|
|
|
&garage, |
|
|
|
|
&version, |
|
|
|
|
1, |
|
|
|
|
first_block, |
|
|
|
|
first_block_hash, |
|
|
|
|
&mut chunker, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
ensure_checksum_matches( |
|
|
|
|
data_md5sum.as_slice(), |
|
|
|
|
data_sha256sum, |
|
|
|
|
content_md5.as_deref(), |
|
|
|
|
content_sha256, |
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
check_quotas(&garage, bucket, key, total_size).await?; |
|
|
|
|
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( |
|
|
|
|
&garage, |
|
|
|
|
&version, |
|
|
|
|
1, |
|
|
|
|
first_block, |
|
|
|
|
first_block_hash, |
|
|
|
|
&mut chunker, |
|
|
|
|
) |
|
|
|
|
.await?; |
|
|
|
|
|
|
|
|
|
Ok((total_size, data_md5sum)) |
|
|
|
|
})() |
|
|
|
|
.await; |
|
|
|
|
ensure_checksum_matches( |
|
|
|
|
data_md5sum.as_slice(), |
|
|
|
|
data_sha256sum, |
|
|
|
|
content_md5.as_deref(), |
|
|
|
|
content_sha256, |
|
|
|
|
)?; |
|
|
|
|
|
|
|
|
|
// If something went wrong, clean up
|
|
|
|
|
let (total_size, md5sum_arr) = match tx_result { |
|
|
|
|
Ok(rv) => rv, |
|
|
|
|
Err(e) => { |
|
|
|
|
// Mark object as aborted, this will free the blocks further down
|
|
|
|
|
object_version.state = ObjectVersionState::Aborted; |
|
|
|
|
let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); |
|
|
|
|
garage.object_table.insert(&object).await?; |
|
|
|
|
return Err(e); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
check_quotas(&garage, bucket, key, total_size).await?; |
|
|
|
|
|
|
|
|
|
// Save final object state, marked as Complete
|
|
|
|
|
let md5sum_hex = hex::encode(md5sum_arr); |
|
|
|
|
let md5sum_hex = hex::encode(data_md5sum); |
|
|
|
|
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( |
|
|
|
|
ObjectVersionMeta { |
|
|
|
|
headers, |
|
|
|
@ -188,6 +182,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( |
|
|
|
|
let object = Object::new(bucket.id, key.into(), vec![object_version]); |
|
|
|
|
garage.object_table.insert(&object).await?; |
|
|
|
|
|
|
|
|
|
// We were not interrupted, everything went fine.
|
|
|
|
|
// We won't have to clean up on drop.
|
|
|
|
|
interrupted_cleanup.cancel(); |
|
|
|
|
|
|
|
|
|
Ok((version_uuid, md5sum_hex)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -426,6 +424,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { |
|
|
|
|
.unwrap() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); |
|
|
|
|
|
|
|
|
|
impl InterruptedCleanup { |
|
|
|
|
fn cancel(&mut self) { |
|
|
|
|
drop(self.0.take()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
impl Drop for InterruptedCleanup { |
|
|
|
|
fn drop(&mut self) { |
|
|
|
|
if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { |
|
|
|
|
tokio::spawn(async move { |
|
|
|
|
let object_version = ObjectVersion { |
|
|
|
|
uuid: version_uuid, |
|
|
|
|
timestamp: version_ts, |
|
|
|
|
state: ObjectVersionState::Aborted, |
|
|
|
|
}; |
|
|
|
|
let object = Object::new(bucket_id, key, vec![object_version]); |
|
|
|
|
if let Err(e) = garage.object_table.insert(&object).await { |
|
|
|
|
warn!("Cannot cleanup after aborted PutObject: {}", e); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ----
|
|
|
|
|
|
|
|
|
|
pub async fn handle_create_multipart_upload( |
|
|
|
|
garage: Arc<Garage>, |
|
|
|
|
req: &Request<Body>, |
|
|
|
|