Small improvements in the S3 put workflow

This commit is contained in:
Alex Auvolat 2021-02-19 12:11:02 +01:00
parent 3b023c0c3b
commit 76390085ef

View File

@ -27,24 +27,25 @@ pub async fn handle_put(
key: &str, key: &str,
content_sha256: Option<Hash>, content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = now_msec();
// Retrieve interesting headers from request
let headers = get_headers(&req)?; let headers = get_headers(&req)?;
let content_md5 = match req.headers().get("content-md5") { let content_md5 = match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
None => None, None => None,
}; };
// Parse body of uploaded file
let body = req.into_body(); let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size); let mut chunker = BodyChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or(vec![]); let first_block = chunker.next().await?.unwrap_or(vec![]);
let mut object_version = ObjectVersion { // If body is small enough, store it directly in the object table
uuid: version_uuid, // as "inline data". We can then return immediately.
timestamp: now_msec(),
state: ObjectVersionState::Uploading(headers.clone()),
};
if first_block.len() < INLINE_THRESHOLD { if first_block.len() < INLINE_THRESHOLD {
let mut md5sum = Md5::new(); let mut md5sum = Md5::new();
md5sum.update(&first_block[..]); md5sum.update(&first_block[..]);
@ -60,27 +61,41 @@ pub async fn handle_put(
content_sha256, content_sha256,
)?; )?;
object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( let object_version = ObjectVersion {
ObjectVersionMeta { uuid: version_uuid,
headers, timestamp: version_timestamp,
size: first_block.len() as u64, state: ObjectVersionState::Complete(ObjectVersionData::Inline(
etag: md5sum_hex.clone(), ObjectVersionMeta {
}, headers,
first_block, size: first_block.len() as u64,
)); etag: md5sum_hex.clone(),
},
first_block,
)),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
return Ok(put_response(version_uuid, md5sum_hex)); return Ok(put_response(version_uuid, md5sum_hex));
} }
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); // Write version identifier in object table so that we have a trace
// that we are uploading something
let first_block_hash = hash(&first_block[..]); let mut object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
state: ObjectVersionState::Uploading(headers.clone()),
};
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
let (total_size, md5sum_arr, sha256sum) = read_and_put_blocks( // Initialize corresponding entry in version table
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]);
// Transfer data and verify checksum
let tx_result = read_and_put_blocks(
&garage, &garage,
version, version,
1, 1,
@ -88,19 +103,31 @@ pub async fn handle_put(
first_block_hash, first_block_hash,
&mut chunker, &mut chunker,
) )
.await?; .await
.and_then(|(total_size, md5sum_arr, sha256sum)| {
ensure_checksum_matches(
md5sum_arr.as_slice(),
sha256sum,
content_md5.as_deref(),
content_sha256,
)
.map(|()| (total_size, md5sum_arr))
});
ensure_checksum_matches( // If something went wrong, clean up
md5sum_arr.as_slice(), let (total_size, md5sum_arr) = match tx_result {
sha256sum, Ok(rv) => rv,
content_md5.as_deref(), Err(e) => {
content_sha256, // Mark object as aborted, this will free the blocks further down
)?; object_version.state = ObjectVersionState::Aborted;
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
// TODO: if at any step we have an error, we should undo everything we did garage.object_table.insert(&object).await?;
return Err(e);
}
};
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(md5sum_arr); let md5sum_hex = hex::encode(md5sum_arr);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, headers,
@ -109,7 +136,6 @@ pub async fn handle_put(
}, },
first_block_hash, first_block_hash,
)); ));
let object = Object::new(bucket.into(), key.into(), vec![object_version]); let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
@ -340,13 +366,12 @@ pub async fn handle_put_part(
}; };
// Read first chuck, and at the same time try to get object to see if it exists // Read first chuck, and at the same time try to get object to see if it exists
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let bucket = bucket.to_string(); let bucket = bucket.to_string();
let key = key.to_string(); let key = key.to_string();
let get_object_fut = garage.object_table.get(&bucket, &key); let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let get_first_block_fut = chunker.next();
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; let (object, first_block) =
futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?;
// Check object is valid and multipart block can be accepted // Check object is valid and multipart block can be accepted
let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
@ -404,8 +429,8 @@ pub async fn handle_complete_multipart_upload(
garage.object_table.get(&bucket, &key), garage.object_table.get(&bucket, &key),
garage.version_table.get(&version_uuid, &EmptyKey), garage.version_table.get(&version_uuid, &EmptyKey),
)?; )?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
let object_version = object let object_version = object
.versions() .versions()
.iter() .iter()
@ -418,11 +443,12 @@ pub async fn handle_complete_multipart_upload(
} }
Some(x) => x.clone(), Some(x) => x.clone(),
}; };
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
if version.blocks().len() == 0 { if version.blocks().len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded"))); return Err(Error::BadRequest(format!("No data was uploaded")));
} }
let headers = match object_version.state { let headers = match object_version.state {
ObjectVersionState::Uploading(headers) => headers.clone(), ObjectVersionState::Uploading(headers) => headers.clone(),
_ => unreachable!(), _ => unreachable!(),
@ -540,8 +566,13 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let mut other = BTreeMap::new(); let mut other = BTreeMap::new();
for h in other_headers.iter() { for h in other_headers.iter() {
if let Some(v) = req.headers().get(h) { if let Some(v) = req.headers().get(h) {
if let Ok(v_str) = v.to_str() { match v.to_str() {
other.insert(h.to_string(), v_str.to_string()); Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
} }
} }
} }