From 0957d0fdfadb27e49c24d63994f52197a9c9cd1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 28 Apr 2020 10:18:14 +0000 Subject: [PATCH] Work on API --- .gitignore | 1 + src/api/api_server.rs | 75 +++++++++++++++------ src/api/encoding.rs | 33 +++++++++ src/api/http_util.rs | 6 -- src/api/lib.rs | 6 +- src/api/s3_copy.rs | 120 +++++++++++++++++++++++++++++++++ src/api/s3_delete.rs | 54 +++++++++++++++ src/api/s3_list.rs | 19 +++++- src/api/s3_put.rs | 137 +++++++++++++++++++------------------- src/api/signature.rs | 23 +------ src/core/version_table.rs | 10 ++- 11 files changed, 364 insertions(+), 120 deletions(-) create mode 100644 src/api/encoding.rs create mode 100644 src/api/s3_copy.rs create mode 100644 src/api/s3_delete.rs diff --git a/.gitignore b/.gitignore index e2a2c81b..95f8a160 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /tmp /pki **/*.rs.bk +*.swp diff --git a/src/api/api_server.rs b/src/api/api_server.rs index b92f4403..6ba5e532 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -14,6 +14,8 @@ use garage_core::garage::Garage; use crate::http_util::*; use crate::signature::check_signature; +use crate::s3_copy::*; +use crate::s3_delete::*; use crate::s3_get::*; use crate::s3_list::*; use crate::s3_put::*; @@ -71,15 +73,7 @@ async fn handler_inner( req: Request, ) -> Result, Error> { let path = req.uri().path().to_string(); - let path = path.trim_start_matches('/'); - let (bucket, key) = match path.find('/') { - Some(i) => { - let (bucket, key) = path.split_at(i); - let key = key.trim_start_matches('/'); - (bucket, Some(key)) - } - None => (path, None), - }; + let (bucket, key) = parse_bucket_key(path.as_str())?; if bucket.len() == 0 { return Err(Error::Forbidden(format!( "Operations on buckets not allowed" @@ -116,14 +110,28 @@ async fn handler_inner( Ok(handle_get(garage, &bucket, &key).await?) } &Method::PUT => { - if ["partnumber", "uploadid"] - .iter() - .all(|x| params.contains_key(&x.to_string())) + if params.contains_key(&"partnumber".to_string()) + && params.contains_key(&"uploadid".to_string()) { // UploadPart query let part_number = params.get("partnumber").unwrap(); let upload_id = params.get("uploadid").unwrap(); Ok(handle_put_part(garage, req, &bucket, &key, part_number, upload_id).await?) + } else if req.headers().contains_key("x-amz-copy-source") { + // CopyObject query + let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; + let (source_bucket, source_key) = parse_bucket_key(copy_source)?; + if !api_key.allow_read(&source_bucket) { + return Err(Error::Forbidden(format!( + "Reading from bucket {} not allowed for this key", + source_bucket + ))); + } + let source_key = match source_key { + None => return Err(Error::BadRequest(format!("No source key specified"))), + Some(x) => x, + }; + Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?) } else { // PutObject query Ok(handle_put(garage, req, &bucket, &key).await?) @@ -148,7 +156,10 @@ async fn handler_inner( } else if params.contains_key(&"uploadid".to_string()) { // CompleteMultipartUpload call let upload_id = params.get("uploadid").unwrap(); - Ok(handle_complete_multipart_upload(garage, req, &bucket, &key, upload_id).await?) + Ok( + handle_complete_multipart_upload(garage, req, &bucket, &key, upload_id) + .await?, + ) } else { Err(Error::BadRequest(format!( "Not a CreateMultipartUpload call, what is it?" @@ -176,12 +187,9 @@ async fn handler_inner( )) } &Method::GET => { - if ["delimiter", "prefix"] - .iter() - .all(|x| params.contains_key(&x.to_string())) - { + if params.contains_key(&"prefix".to_string()) { // ListObjects query - let delimiter = params.get("delimiter").unwrap(); + let delimiter = params.get("delimiter").map(|x| x.as_str()).unwrap_or(&""); let max_keys = params .get("max-keys") .map(|x| { @@ -191,7 +199,21 @@ async fn handler_inner( }) .unwrap_or(Ok(1000))?; let prefix = params.get("prefix").unwrap(); - Ok(handle_list(garage, bucket, delimiter, max_keys, prefix).await?) + let urlencode_resp = params + .get("encoding-type") + .map(|x| x == "url") + .unwrap_or(false); + let marker = params.get("marker").map(String::as_str); + Ok(handle_list( + garage, + bucket, + delimiter, + max_keys, + prefix, + marker, + urlencode_resp, + ) + .await?) } else { Err(Error::BadRequest(format!( "Not a list call, so what is it?" @@ -202,3 +224,18 @@ async fn handler_inner( } } } + +fn parse_bucket_key(path: &str) -> Result<(&str, Option<&str>), Error> { + if !path.starts_with('/') { + return Err(Error::BadRequest(format!( + "Invalid path: {}, should start with a /", + path + ))); + } + let path = &path[1..]; + + match path.find('/') { + Some(i) => Ok((&path[..i], Some(&path[i + 1..]))), + None => Ok((path, None)), + } +} diff --git a/src/api/encoding.rs b/src/api/encoding.rs new file mode 100644 index 00000000..25999207 --- /dev/null +++ b/src/api/encoding.rs @@ -0,0 +1,33 @@ +pub fn xml_escape(s: &str) -> String { + s.replace("<", "<") + .replace(">", ">") + .replace("\"", """) +} + +pub fn uri_encode(string: &str, encode_slash: bool) -> String { + let mut result = String::with_capacity(string.len() * 2); + for c in string.chars() { + match c { + 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' | '~' | '.' => result.push(c), + '/' if encode_slash => result.push_str("%2F"), + '/' if !encode_slash => result.push('/'), + _ => { + result.push_str( + &format!("{}", c) + .bytes() + .map(|b| format!("%{:02X}", b)) + .collect::(), + ); + } + } + } + result +} + +pub fn xml_encode_key(k: &str, urlencode: bool) -> String { + if urlencode { + uri_encode(k, true) + } else { + xml_escape(k) + } +} diff --git a/src/api/http_util.rs b/src/api/http_util.rs index 2f052117..029b7020 100644 --- a/src/api/http_util.rs +++ b/src/api/http_util.rs @@ -82,9 +82,3 @@ impl From> for BytesBody { Self::new(Bytes::from(x)) } } - -pub fn xml_escape(s: &str) -> String { - s.replace("<", "<") - .replace(">", ">") - .replace("\"", """) -} diff --git a/src/api/lib.rs b/src/api/lib.rs index da537222..5f872941 100644 --- a/src/api/lib.rs +++ b/src/api/lib.rs @@ -1,10 +1,14 @@ #[macro_use] extern crate log; -pub mod api_server; +pub mod encoding; pub mod http_util; + +pub mod api_server; pub mod signature; +pub mod s3_copy; +pub mod s3_delete; pub mod s3_get; pub mod s3_list; pub mod s3_put; diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs new file mode 100644 index 00000000..f8eaa0e7 --- /dev/null +++ b/src/api/s3_copy.rs @@ -0,0 +1,120 @@ +use std::fmt::Write; +use std::sync::Arc; + +use chrono::{SecondsFormat, Utc}; +use hyper::Response; + +use garage_table::*; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::object_table::*; +use garage_core::version_table::*; + +use crate::http_util::*; + +pub async fn handle_copy( + garage: Arc, + dest_bucket: &str, + dest_key: &str, + source_bucket: &str, + source_key: &str, +) -> Result, Error> { + let source_object = match garage + .object_table + .get(&source_bucket.to_string(), &source_key.to_string()) + .await? + { + None => return Err(Error::NotFound), + Some(o) => o, + }; + + let source_last_v = match source_object + .versions() + .iter() + .rev() + .filter(|v| v.is_complete()) + .next() + { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let new_uuid = gen_uuid(); + let dest_object_version = ObjectVersion { + uuid: new_uuid, + timestamp: now_msec(), + mime_type: source_last_v.mime_type.clone(), + size: source_last_v.size, + state: ObjectVersionState::Complete, + data: source_last_v.data.clone(), + }; + + match &source_last_v.data { + ObjectVersionData::Uploading => { + return Err(Error::Message(format!( + "Version is_complete() but data is stil Uploading (internal error)" + ))); + } + ObjectVersionData::DeleteMarker => { + return Err(Error::NotFound); + } + ObjectVersionData::Inline(_bytes) => { + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); + garage.object_table.insert(&dest_object).await?; + } + ObjectVersionData::FirstBlock(_first_block_hash) => { + let source_version = garage + .version_table + .get(&source_last_v.uuid, &EmptyKey) + .await?; + let source_version = match source_version { + Some(v) => v, + None => return Err(Error::NotFound), + }; + + let dest_version = Version::new( + new_uuid, + dest_bucket.to_string(), + dest_key.to_string(), + false, + source_version.blocks().to_vec(), + ); + let dest_object = Object::new( + dest_bucket.to_string(), + dest_key.to_string(), + vec![dest_object_version], + ); + let dest_block_refs = dest_version + .blocks() + .iter() + .map(|b| BlockRef { + block: b.hash, + version: new_uuid, + deleted: false, + }) + .collect::>(); + futures::try_join!( + garage.object_table.insert(&dest_object), + garage.version_table.insert(&dest_version), + garage.block_ref_table.insert_many(&dest_block_refs[..]), + )?; + } + } + + let now = Utc::now(); + let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true); + let mut xml = String::new(); + writeln!(&mut xml, r#""#).unwrap(); + writeln!(&mut xml, r#""#).unwrap(); + writeln!(&mut xml, "\t{}", last_modified).unwrap(); + writeln!(&mut xml, "").unwrap(); + + Ok(Response::new(Box::new(BytesBody::from(xml.into_bytes())))) +} diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs new file mode 100644 index 00000000..4d6805fb --- /dev/null +++ b/src/api/s3_delete.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_core::garage::Garage; +use garage_core::object_table::*; + +pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result { + let object = match garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await? + { + None => { + // No need to delete + return Ok([0u8; 32].into()); + } + Some(o) => o, + }; + + let interesting_versions = object.versions().iter().filter(|v| { + v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted + }); + + let mut must_delete = false; + let mut timestamp = now_msec(); + for v in interesting_versions { + must_delete = true; + timestamp = std::cmp::max(timestamp, v.timestamp + 1); + } + + if !must_delete { + return Ok([0u8; 32].into()); + } + + let version_uuid = gen_uuid(); + + let object = Object::new( + bucket.into(), + key.into(), + vec![ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + state: ObjectVersionState::Complete, + data: ObjectVersionData::DeleteMarker, + }], + ); + + garage.object_table.insert(&object).await?; + return Ok(version_uuid); +} diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 9baaba85..88f76771 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -9,6 +9,7 @@ use garage_util::error::Error; use garage_core::garage::Garage; +use crate::encoding::*; use crate::http_util::*; #[derive(Debug)] @@ -23,11 +24,13 @@ pub async fn handle_list( delimiter: &str, max_keys: usize, prefix: &str, + marker: Option<&str>, + urlencode_resp: bool, ) -> Result, Error> { let mut result_keys = BTreeMap::::new(); let mut result_common_prefixes = BTreeSet::::new(); let mut truncated = true; - let mut next_chunk_start = prefix.to_string(); + let mut next_chunk_start = marker.unwrap_or(prefix).to_string(); debug!("List request: `{}` {} `{}`", delimiter, max_keys, prefix); @@ -97,7 +100,12 @@ pub async fn handle_list( let last_modif = DateTime::::from_utc(last_modif, Utc); let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true); writeln!(&mut xml, "\t").unwrap(); - writeln!(&mut xml, "\t\t{}", xml_escape(key)).unwrap(); + writeln!( + &mut xml, + "\t\t{}", + xml_encode_key(key, urlencode_resp) + ) + .unwrap(); writeln!(&mut xml, "\t\t{}", last_modif).unwrap(); writeln!(&mut xml, "\t\t{}", info.size).unwrap(); writeln!(&mut xml, "\t\tSTANDARD").unwrap(); @@ -106,7 +114,12 @@ pub async fn handle_list( if result_common_prefixes.len() > 0 { writeln!(&mut xml, "\t").unwrap(); for pfx in result_common_prefixes.iter() { - writeln!(&mut xml, "\t{}", xml_escape(pfx)).unwrap(); + writeln!( + &mut xml, + "\t{}", + xml_encode_key(pfx, urlencode_resp) + ) + .unwrap(); } writeln!(&mut xml, "\t").unwrap(); } diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 70a467a8..e6df5bc0 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -5,9 +5,9 @@ use std::sync::Arc; use futures::stream::*; use hyper::{Body, Request, Response}; +use garage_table::*; use garage_util::data::*; use garage_util::error::Error; -use garage_table::*; use garage_core::block::INLINE_THRESHOLD; use garage_core::block_ref_table::*; @@ -15,6 +15,7 @@ use garage_core::garage::Garage; use garage_core::object_table::*; use garage_core::version_table::*; +use crate::encoding::*; use crate::http_util::*; pub async fn handle_put( @@ -30,7 +31,7 @@ pub async fn handle_put( let mut chunker = BodyChunker::new(body, garage.config.block_size); let first_block = match chunker.next().await? { Some(x) => x, - None => return Err(Error::BadRequest(format!("Empty body"))), + None => vec![], }; let mut object_version = ObjectVersion { @@ -58,7 +59,15 @@ pub async fn handle_put( let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; - let total_size = read_and_put_blocks(&garage, version, 1, first_block, first_block_hash, &mut chunker).await?; + let total_size = read_and_put_blocks( + &garage, + version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; // TODO: if at any step we have an error, we should undo everything we did @@ -80,8 +89,14 @@ async fn read_and_put_blocks( chunker: &mut BodyChunker, ) -> Result { let mut next_offset = first_block.len(); - let mut put_curr_version_block = - put_block_meta(garage.clone(), &version, part_number, 0, first_block_hash, first_block.len() as u64); + let mut put_curr_version_block = put_block_meta( + garage.clone(), + &version, + part_number, + 0, + first_block_hash, + first_block.len() as u64, + ); let mut put_curr_block = garage .block_manager .rpc_put_block(first_block_hash, first_block); @@ -92,8 +107,14 @@ async fn read_and_put_blocks( if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = - put_block_meta(garage.clone(), &version, part_number, next_offset as u64, block_hash, block_len as u64); + put_curr_version_block = put_block_meta( + garage.clone(), + &version, + part_number, + next_offset as u64, + block_hash, + block_len as u64, + ); put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); next_offset += block_len; } else { @@ -232,8 +253,9 @@ pub async fn handle_put_part( .parse::() .map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?; - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; - + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + // Read first chuck, and at the same time try to get object to see if it exists let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); @@ -265,7 +287,15 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let first_block_hash = hash(&first_block[..]); - read_and_put_blocks(&garage, version, part_number, first_block, first_block_hash, &mut chunker).await?; + read_and_put_blocks( + &garage, + version, + part_number, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; Ok(Response::new(Box::new(BytesBody::from(vec![])))) } @@ -277,7 +307,8 @@ pub async fn handle_complete_multipart_upload( key: &str, upload_id: &str, ) -> Result, Error> { - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; let bucket = bucket.to_string(); let key = key.to_string(); @@ -295,9 +326,11 @@ pub async fn handle_complete_multipart_upload( && v.data == ObjectVersionData::Uploading }); let mut object_version = match object_version { - None => return Err(Error::BadRequest(format!( - "Multipart upload does not exist or has already been completed" - ))), + None => { + return Err(Error::BadRequest(format!( + "Multipart upload does not exist or has already been completed" + ))) + } Some(x) => x.clone(), }; let version = match version { @@ -311,7 +344,11 @@ pub async fn handle_complete_multipart_upload( // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... - let total_size = version.blocks().iter().map(|x| x.size).fold(0, |x, y| x+y); + let total_size = version + .blocks() + .iter() + .map(|x| x.size) + .fold(0, |x, y| x + y); object_version.size = total_size; object_version.state = ObjectVersionState::Complete; object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); @@ -325,7 +362,12 @@ pub async fn handle_complete_multipart_upload( r#""# ) .unwrap(); - writeln!(&mut xml, "\t{}", garage.config.s3_api.s3_region).unwrap(); + writeln!( + &mut xml, + "\t{}", + garage.config.s3_api.s3_region + ) + .unwrap(); writeln!(&mut xml, "\t{}", bucket).unwrap(); writeln!(&mut xml, "\t{}", xml_escape(&key)).unwrap(); writeln!(&mut xml, "").unwrap(); @@ -339,9 +381,13 @@ pub async fn handle_abort_multipart_upload( key: &str, upload_id: &str, ) -> Result, Error> { - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; - let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?; + let object = garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await?; let object = match object { None => return Err(Error::BadRequest(format!("Object not found"))), Some(x) => x, @@ -352,9 +398,11 @@ pub async fn handle_abort_multipart_upload( && v.data == ObjectVersionData::Uploading }); let mut object_version = match object_version { - None => return Err(Error::BadRequest(format!( - "Multipart upload does not exist or has already been completed" - ))), + None => { + return Err(Error::BadRequest(format!( + "Multipart upload does not exist or has already been completed" + ))) + } Some(x) => x.clone(), }; @@ -383,50 +431,3 @@ fn uuid_from_str(id: &str) -> Result { uuid.copy_from_slice(&id_bin[..]); Ok(UUID::from(uuid)) } - -pub async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result { - let object = match garage - .object_table - .get(&bucket.to_string(), &key.to_string()) - .await? - { - None => { - // No need to delete - return Ok([0u8; 32].into()); - } - Some(o) => o, - }; - - let interesting_versions = object.versions().iter().filter(|v| { - v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted - }); - - let mut must_delete = false; - let mut timestamp = now_msec(); - for v in interesting_versions { - must_delete = true; - timestamp = std::cmp::max(timestamp, v.timestamp + 1); - } - - if !must_delete { - return Ok([0u8; 32].into()); - } - - let version_uuid = gen_uuid(); - - let object = Object::new( - bucket.into(), - key.into(), - vec![ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - mime_type: "application/x-delete-marker".into(), - size: 0, - state: ObjectVersionState::Complete, - data: ObjectVersionData::DeleteMarker, - }], - ); - - garage.object_table.insert(&object).await?; - return Ok(version_uuid); -} diff --git a/src/api/signature.rs b/src/api/signature.rs index 2e82269c..a1ccfd08 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -11,6 +11,8 @@ use garage_util::error::Error; use garage_core::garage::Garage; use garage_core::key_table::*; +use crate::encoding::uri_encode; + const SHORT_DATE: &str = "%Y%m%d"; const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ"; @@ -284,24 +286,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String { "".to_string() } } - -fn uri_encode(string: &str, encode_slash: bool) -> String { - let mut result = String::with_capacity(string.len() * 2); - for c in string.chars() { - match c { - 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' | '~' | '.' => result.push(c), - '/' if encode_slash => result.push_str("%2F"), - '/' if !encode_slash => result.push('/'), - _ => { - result.push('%'); - result.push_str( - &format!("{}", c) - .bytes() - .map(|b| format!("{:02X}", b)) - .collect::(), - ); - } - } - } - result -} diff --git a/src/core/version_table.rs b/src/core/version_table.rs index 66f737bb..6054e389 100644 --- a/src/core/version_table.rs +++ b/src/core/version_table.rs @@ -49,7 +49,10 @@ impl Version { } /// Adds a block if it wasn't already present pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { - match self.blocks.binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) { + match self + .blocks + .binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key())) + { Err(i) => { self.blocks.insert(i, new); Ok(()) @@ -90,7 +93,10 @@ impl Entry for Version { self.blocks.clear(); } else if !self.deleted { for bi in other.blocks.iter() { - match self.blocks.binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) { + match self + .blocks + .binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key())) + { Ok(_) => (), Err(pos) => { self.blocks.insert(pos, bi.clone());