Time and metadata improvements

This commit is contained in:
Alex Auvolat 2021-03-15 16:21:41 +01:00
parent 097c339d98
commit 3bf2df622a
16 changed files with 111 additions and 44 deletions

1
Cargo.lock generated
View File

@ -608,6 +608,7 @@ name = "garage_util"
version = "0.1.1"
dependencies = [
"blake2",
"chrono",
"err-derive",
"fasthash",
"futures",

View File

@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?)
Ok(
handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key)
.await?,
)
} else {
// PutObject query
Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?)

View File

@ -1,11 +1,11 @@
use std::fmt::Write;
use std::sync::Arc;
use chrono::{SecondsFormat, Utc};
use hyper::{Body, Response};
use hyper::{Body, Request, Response};
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
@ -13,9 +13,11 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
use crate::s3_put::get_headers;
pub async fn handle_copy(
garage: Arc<Garage>,
req: &Request<Body>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
@ -42,25 +44,44 @@ pub async fn handle_copy(
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Complete(source_last_state.clone()),
};
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
vec![dest_object_version],
);
match source_last_state {
// Implement x-amz-metadata-directive: REPLACE
let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
ObjectVersionData::Inline(_meta, _bytes) => {
ObjectVersionData::Inline(meta, _bytes) => meta,
ObjectVersionData::FirstBlock(meta, _fbh) => meta,
};
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req)?,
size: old_meta.size,
etag: old_meta.etag.clone(),
},
_ => old_meta.clone(),
};
// Save object copy
match source_last_state {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
new_meta,
bytes.clone(),
)),
};
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
}
ObjectVersionData::FirstBlock(meta, _first_block_hash) => {
ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
// Get block list from source version
let source_version = garage
.version_table
@ -74,7 +95,7 @@ pub async fn handle_copy(
let tmp_dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Uploading(meta.headers.clone()),
state: ObjectVersionState::Uploading(new_meta.headers.clone()),
};
let tmp_dest_object = Object::new(
dest_bucket.to_string(),
@ -120,12 +141,24 @@ pub async fn handle_copy(
// it to update the modification timestamp for instance). If we did this concurrently
// with the stuff before, the block's reference counts could be decremented before
// they are incremented again for the new version, leading to data being deleted.
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
new_meta,
*first_block_hash,
)),
};
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
vec![dest_object_version],
);
garage.object_table.insert(&dest_object).await?;
}
}
let now = Utc::now(); // FIXME use the unix timestamp from above
let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
let last_modified = msec_to_rfc3339(new_timestamp);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap();

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use hyper::{Body, Request, Response};
use garage_util::data::*;
use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;

View File

@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Write;
use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
use hyper::{Body, Response};
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@ -247,9 +247,7 @@ pub async fn handle_list(
}
for (key, info) in result_keys.iter() {
let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0);
let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc);
let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true);
let last_modif = msec_to_rfc3339(info.last_modified);
writeln!(&mut xml, "\t<Contents>").unwrap();
writeln!(
&mut xml,

View File

@ -10,6 +10,7 @@ use sha2::{Digest as Sha256Digest, Sha256};
use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
@ -583,17 +584,19 @@ fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
.to_string())
}
fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
let other_headers = vec![
let mut other = BTreeMap::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
let mut other = BTreeMap::new();
for h in other_headers.iter() {
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
@ -605,6 +608,21 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
}
}
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
}
}
Ok(ObjectVersionHeaders {
content_type,
other,

View File

@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
use garage_rpc::membership::*;
use garage_rpc::ring::*;

View File

@ -11,9 +11,9 @@ use tokio::fs;
use tokio::prelude::*;
use tokio::sync::{watch, Mutex, Notify};
use garage_util::data;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
@ -174,7 +174,7 @@ impl BlockManager {
f.read_to_end(&mut data).await?;
drop(f);
if data::blake2sum(&data[..]) != *hash {
if blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await;
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",

View File

@ -18,6 +18,7 @@ use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
use crate::consul::get_consul_nodes;
use crate::ring::*;

View File

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use garage_util::data::now_msec;
use garage_util::time::now_msec;
use crate::crdt::crdt::*;

View File

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
use garage_util::data::now_msec;
use garage_util::time::now_msec;
use crate::crdt::crdt::*;

View File

@ -75,17 +75,19 @@ where
match self.gc_loop_iter().await {
Ok(true) => {
// Stuff was done, loop imediately
continue;
}
Ok(false) => {
select! {
_ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
_ = must_exit.recv().fuse() => (),
}
// Nothing was done, sleep for some time (below)
}
Err(e) => {
warn!("({}) Error doing GC: {}", self.data.name, e);
}
}
select! {
_ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
Ok(())
}

View File

@ -27,6 +27,7 @@ toml = "0.5"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
chrono = "0.4"
futures = "0.3"
futures-util = "0.3"

View File

@ -2,7 +2,6 @@ use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@ -119,13 +118,6 @@ pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
pub fn now_msec() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Fix your clock :o")
.as_millis() as u64
}
// RMP serialization with names of fields and variants
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>

View File

@ -5,3 +5,4 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
pub mod time;

16
src/util/time.rs Normal file
View File

@ -0,0 +1,16 @@
use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH};
pub fn now_msec() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Fix your clock :o")
.as_millis() as u64
}
pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
let timestamp = Utc.timestamp(secs, nanos);
timestamp.to_rfc3339_opts(SecondsFormat::Secs, true)
}