Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/620
This commit is contained in:
commit
3f461d8891
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -1340,6 +1340,7 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"base64 0.21.3",
|
"base64 0.21.3",
|
||||||
"blake2",
|
"blake2",
|
||||||
|
"chrono",
|
||||||
"err-derive",
|
"err-derive",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
@ -33,7 +33,7 @@ args@{
|
|||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "d4392b23d407f7ebc20d7f5db7583847e362665c1abb09f1c1d3305205e5996d";
|
nixifiedLockHash = "f5b86f9d75664ba528a26ae71f07a38e9c72c78fe331420b9b639e2a099d4dad";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
@ -1911,6 +1911,7 @@ in
|
|||||||
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out;
|
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.73" { profileName = "__noProfile"; }).out;
|
||||||
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.3" { inherit profileName; }).out;
|
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.3" { inherit profileName; }).out;
|
||||||
blake2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.10.6" { inherit profileName; }).out;
|
blake2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.10.6" { inherit profileName; }).out;
|
||||||
|
chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.26" { inherit profileName; }).out;
|
||||||
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
||||||
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out;
|
futures = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.28" { inherit profileName; }).out;
|
||||||
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out;
|
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.28" { inherit profileName; }).out;
|
||||||
|
@ -75,16 +75,13 @@ but these endpoints are documented in [Red Hat Ceph Storage - Chapter 2. Ceph Ob
|
|||||||
|
|
||||||
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|
||||||
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
|
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
|
||||||
| [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
| [AbortMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
||||||
| [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) | ✅ Implemented (see details below) | ✅ | ✅ | ✅ | ✅ |
|
| [CompleteMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
||||||
| [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) | ✅ Implemented | ✅| ✅ | ✅ | ✅ |
|
| [CreateMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html) | ✅ Implemented | ✅| ✅ | ✅ | ✅ |
|
||||||
| [ListMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
| [ListMultipartUpload](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUpload.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
||||||
| [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
| [ListParts](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
||||||
| [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) | ✅ Implemented (see details below) | ✅ | ✅| ✅ | ✅ |
|
| [UploadPart](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html) | ✅ Implemented | ✅ | ✅| ✅ | ✅ |
|
||||||
| [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
| [UploadPartCopy](https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html) | ✅ Implemented | ✅ | ✅ | ✅ | ✅ |
|
||||||
|
|
||||||
Our implementation of Multipart Upload is currently a bit more restrictive than Amazon's one in some edge cases.
|
|
||||||
For more information, please refer to our [issue tracker](https://git.deuxfleurs.fr/Deuxfleurs/garage/issues/204).
|
|
||||||
|
|
||||||
### Website endpoints
|
### Website endpoints
|
||||||
|
|
||||||
@ -127,15 +124,22 @@ If you need this feature, please [share your use case in our dedicated issue](ht
|
|||||||
|
|
||||||
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|
| Endpoint | Garage | [Openstack Swift](https://docs.openstack.org/swift/latest/s3_compat.html) | [Ceph Object Gateway](https://docs.ceph.com/en/latest/radosgw/s3/) | [Riak CS](https://docs.riak.com/riak/cs/2.1.1/references/apis/storage/s3/index.html) | [OpenIO](https://docs.openio.io/latest/source/arch-design/s3_compliancy.html) |
|
||||||
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
|
|------------------------------|----------------------------------|-----------------|---------------|---------|-----|
|
||||||
| [DeleteBucketLifecycle](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html) | ❌ Missing | ❌| ✅| ❌| ✅|
|
| [DeleteBucketLifecycle](https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteBucketLifecycle.html) | ✅ Implemented | ❌| ✅| ❌| ✅|
|
||||||
| [GetBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
|
| [GetBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html) | ✅ Implemented | ❌| ✅ | ❌| ✅|
|
||||||
| [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
|
| [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) | ⚠ Partially implemented (see below) | ❌| ✅ | ❌| ✅|
|
||||||
| [GetBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html) | ❌ Stub (see below) | ✅| ✅ | ❌| ✅|
|
| [GetBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketVersioning.html) | ❌ Stub (see below) | ✅| ✅ | ❌| ✅|
|
||||||
| [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
|
| [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) | ❌ Missing | ❌| ✅ | ❌| ✅|
|
||||||
| [PutBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html) | ❌ Missing | ❌| ✅| ❌| ✅|
|
| [PutBucketVersioning](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketVersioning.html) | ❌ Missing | ❌| ✅| ❌| ✅|
|
||||||
|
|
||||||
|
**PutBucketLifecycleConfiguration:** The only actions supported are
|
||||||
|
`AbortIncompleteMultipartUpload` and `Expiration` (without the
|
||||||
|
`ExpiredObjectDeleteMarker` field). All other operations are dependent on
|
||||||
|
either bucket versionning or storage classes which Garage currently does not
|
||||||
|
implement. The deprecated `Prefix` member directly in the the `Rule`
|
||||||
|
structure/XML tag is not supported, specified prefixes must be inside the
|
||||||
|
`Filter` structure/XML tag.
|
||||||
|
|
||||||
**GetBucketVersioning:** Stub implementation (Garage does not yet support versionning so this always returns "versionning not enabled").
|
**GetBucketVersioning:** Stub implementation which always returns "versionning not enabled", since Garage does not yet support bucket versionning.
|
||||||
|
|
||||||
### Replication endpoints
|
### Replication endpoints
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ use crate::s3::copy::*;
|
|||||||
use crate::s3::cors::*;
|
use crate::s3::cors::*;
|
||||||
use crate::s3::delete::*;
|
use crate::s3::delete::*;
|
||||||
use crate::s3::get::*;
|
use crate::s3::get::*;
|
||||||
|
use crate::s3::lifecycle::*;
|
||||||
use crate::s3::list::*;
|
use crate::s3::list::*;
|
||||||
use crate::s3::multipart::*;
|
use crate::s3::multipart::*;
|
||||||
use crate::s3::post_object::handle_post_object;
|
use crate::s3::post_object::handle_post_object;
|
||||||
@ -354,14 +355,21 @@ impl ApiHandler for S3ApiServer {
|
|||||||
}
|
}
|
||||||
Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
|
Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await,
|
||||||
Endpoint::PutBucketWebsite {} => {
|
Endpoint::PutBucketWebsite {} => {
|
||||||
handle_put_website(garage, bucket_id, req, content_sha256).await
|
handle_put_website(garage, bucket.clone(), req, content_sha256).await
|
||||||
}
|
}
|
||||||
Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await,
|
Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await,
|
||||||
Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
|
Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await,
|
||||||
Endpoint::PutBucketCors {} => {
|
Endpoint::PutBucketCors {} => {
|
||||||
handle_put_cors(garage, bucket_id, req, content_sha256).await
|
handle_put_cors(garage, bucket.clone(), req, content_sha256).await
|
||||||
|
}
|
||||||
|
Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await,
|
||||||
|
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await,
|
||||||
|
Endpoint::PutBucketLifecycleConfiguration {} => {
|
||||||
|
handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await
|
||||||
|
}
|
||||||
|
Endpoint::DeleteBucketLifecycle {} => {
|
||||||
|
handle_delete_lifecycle(garage, bucket.clone()).await
|
||||||
}
|
}
|
||||||
Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await,
|
|
||||||
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -44,14 +44,11 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result<Response<Body>, Error> {
|
|||||||
|
|
||||||
pub async fn handle_delete_cors(
|
pub async fn handle_delete_cors(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket_id: Uuid,
|
mut bucket: Bucket,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let mut bucket = garage
|
let param = bucket
|
||||||
.bucket_helper()
|
.params_mut()
|
||||||
.get_existing_bucket(bucket_id)
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let param = bucket.params_mut().unwrap();
|
|
||||||
|
|
||||||
param.cors_config.update(None);
|
param.cors_config.update(None);
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
@ -63,7 +60,7 @@ pub async fn handle_delete_cors(
|
|||||||
|
|
||||||
pub async fn handle_put_cors(
|
pub async fn handle_put_cors(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket_id: Uuid,
|
mut bucket: Bucket,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
@ -73,12 +70,9 @@ pub async fn handle_put_cors(
|
|||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut bucket = garage
|
let param = bucket
|
||||||
.bucket_helper()
|
.params_mut()
|
||||||
.get_existing_bucket(bucket_id)
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let param = bucket.params_mut().unwrap();
|
|
||||||
|
|
||||||
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
|
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
|
||||||
conf.validate()?;
|
conf.validate()?;
|
||||||
|
401
src/api/s3/lifecycle.rs
Normal file
401
src/api/s3/lifecycle.rs
Normal file
@ -0,0 +1,401 @@
|
|||||||
|
use quick_xml::de::from_reader;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::s3::error::*;
|
||||||
|
use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
|
||||||
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
|
use garage_model::bucket_table::{
|
||||||
|
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
|
||||||
|
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
|
||||||
|
};
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result<Response<Body>, Error> {
|
||||||
|
let param = bucket
|
||||||
|
.params()
|
||||||
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
|
|
||||||
|
if let Some(lifecycle) = param.lifecycle_config.get() {
|
||||||
|
let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle);
|
||||||
|
let xml = to_xml_with_header(&wc)?;
|
||||||
|
Ok(Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header(http::header::CONTENT_TYPE, "application/xml")
|
||||||
|
.body(Body::from(xml))?)
|
||||||
|
} else {
|
||||||
|
Ok(Response::builder()
|
||||||
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_delete_lifecycle(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
mut bucket: Bucket,
|
||||||
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let param = bucket
|
||||||
|
.params_mut()
|
||||||
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
|
|
||||||
|
param.lifecycle_config.update(None);
|
||||||
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
|
|
||||||
|
Ok(Response::builder()
|
||||||
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_put_lifecycle(
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
mut bucket: Bucket,
|
||||||
|
req: Request<Body>,
|
||||||
|
content_sha256: Option<Hash>,
|
||||||
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let param = bucket
|
||||||
|
.params_mut()
|
||||||
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
|
|
||||||
|
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
|
||||||
|
let config = conf
|
||||||
|
.validate_into_garage_lifecycle_config()
|
||||||
|
.ok_or_bad_request("Invalid lifecycle configuration")?;
|
||||||
|
|
||||||
|
param.lifecycle_config.update(Some(config));
|
||||||
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
|
|
||||||
|
Ok(Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.body(Body::empty())?)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ----
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct LifecycleConfiguration {
|
||||||
|
#[serde(serialize_with = "xmlns_tag", skip_deserializing)]
|
||||||
|
pub xmlns: (),
|
||||||
|
#[serde(rename = "Rule")]
|
||||||
|
pub lifecycle_rules: Vec<LifecycleRule>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct LifecycleRule {
|
||||||
|
#[serde(rename = "ID")]
|
||||||
|
pub id: Option<Value>,
|
||||||
|
#[serde(rename = "Status")]
|
||||||
|
pub status: Value,
|
||||||
|
#[serde(rename = "Filter", default)]
|
||||||
|
pub filter: Option<Filter>,
|
||||||
|
#[serde(rename = "Expiration", default)]
|
||||||
|
pub expiration: Option<Expiration>,
|
||||||
|
#[serde(rename = "AbortIncompleteMultipartUpload", default)]
|
||||||
|
pub abort_incomplete_mpu: Option<AbortIncompleteMpu>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Default)]
|
||||||
|
pub struct Filter {
|
||||||
|
#[serde(rename = "And")]
|
||||||
|
pub and: Option<Box<Filter>>,
|
||||||
|
#[serde(rename = "Prefix")]
|
||||||
|
pub prefix: Option<Value>,
|
||||||
|
#[serde(rename = "ObjectSizeGreaterThan")]
|
||||||
|
pub size_gt: Option<IntValue>,
|
||||||
|
#[serde(rename = "ObjectSizeLessThan")]
|
||||||
|
pub size_lt: Option<IntValue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct Expiration {
|
||||||
|
#[serde(rename = "Days")]
|
||||||
|
pub days: Option<IntValue>,
|
||||||
|
#[serde(rename = "Date")]
|
||||||
|
pub at_date: Option<Value>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||||
|
pub struct AbortIncompleteMpu {
|
||||||
|
#[serde(rename = "DaysAfterInitiation")]
|
||||||
|
pub days: IntValue,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LifecycleConfiguration {
|
||||||
|
pub fn validate_into_garage_lifecycle_config(
|
||||||
|
self,
|
||||||
|
) -> Result<Vec<GarageLifecycleRule>, &'static str> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
for rule in self.lifecycle_rules {
|
||||||
|
ret.push(rule.validate_into_garage_lifecycle_rule()?);
|
||||||
|
}
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_garage_lifecycle_config(config: &[GarageLifecycleRule]) -> Self {
|
||||||
|
Self {
|
||||||
|
xmlns: (),
|
||||||
|
lifecycle_rules: config
|
||||||
|
.iter()
|
||||||
|
.map(LifecycleRule::from_garage_lifecycle_rule)
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LifecycleRule {
|
||||||
|
pub fn validate_into_garage_lifecycle_rule(self) -> Result<GarageLifecycleRule, &'static str> {
|
||||||
|
let enabled = match self.status.0.as_str() {
|
||||||
|
"Enabled" => true,
|
||||||
|
"Disabled" => false,
|
||||||
|
_ => return Err("invalid value for <Status>"),
|
||||||
|
};
|
||||||
|
|
||||||
|
let filter = self
|
||||||
|
.filter
|
||||||
|
.map(Filter::validate_into_garage_lifecycle_filter)
|
||||||
|
.transpose()?
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let abort_incomplete_mpu_days = self.abort_incomplete_mpu.map(|x| x.days.0 as usize);
|
||||||
|
|
||||||
|
let expiration = self
|
||||||
|
.expiration
|
||||||
|
.map(Expiration::validate_into_garage_lifecycle_expiration)
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
Ok(GarageLifecycleRule {
|
||||||
|
id: self.id.map(|x| x.0),
|
||||||
|
enabled,
|
||||||
|
filter,
|
||||||
|
abort_incomplete_mpu_days,
|
||||||
|
expiration,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_garage_lifecycle_rule(rule: &GarageLifecycleRule) -> Self {
|
||||||
|
Self {
|
||||||
|
id: rule.id.as_deref().map(Value::from),
|
||||||
|
status: if rule.enabled {
|
||||||
|
Value::from("Enabled")
|
||||||
|
} else {
|
||||||
|
Value::from("Disabled")
|
||||||
|
},
|
||||||
|
filter: Filter::from_garage_lifecycle_filter(&rule.filter),
|
||||||
|
abort_incomplete_mpu: rule
|
||||||
|
.abort_incomplete_mpu_days
|
||||||
|
.map(|days| AbortIncompleteMpu {
|
||||||
|
days: IntValue(days as i64),
|
||||||
|
}),
|
||||||
|
expiration: rule
|
||||||
|
.expiration
|
||||||
|
.as_ref()
|
||||||
|
.map(Expiration::from_garage_lifecycle_expiration),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Filter {
|
||||||
|
pub fn count(&self) -> i32 {
|
||||||
|
fn count<T>(x: &Option<T>) -> i32 {
|
||||||
|
x.as_ref().map(|_| 1).unwrap_or(0)
|
||||||
|
}
|
||||||
|
count(&self.prefix) + count(&self.size_gt) + count(&self.size_lt)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn validate_into_garage_lifecycle_filter(
|
||||||
|
self,
|
||||||
|
) -> Result<GarageLifecycleFilter, &'static str> {
|
||||||
|
if self.count() > 0 && self.and.is_some() {
|
||||||
|
Err("Filter tag cannot contain both <And> and another condition")
|
||||||
|
} else if let Some(and) = self.and {
|
||||||
|
if and.and.is_some() {
|
||||||
|
return Err("Nested <And> tags");
|
||||||
|
}
|
||||||
|
Ok(and.internal_into_garage_lifecycle_filter())
|
||||||
|
} else if self.count() > 1 {
|
||||||
|
Err("Multiple Filter conditions must be wrapped in an <And> tag")
|
||||||
|
} else {
|
||||||
|
Ok(self.internal_into_garage_lifecycle_filter())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter {
|
||||||
|
GarageLifecycleFilter {
|
||||||
|
prefix: self.prefix.map(|x| x.0),
|
||||||
|
size_gt: self.size_gt.map(|x| x.0 as u64),
|
||||||
|
size_lt: self.size_lt.map(|x| x.0 as u64),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_garage_lifecycle_filter(rule: &GarageLifecycleFilter) -> Option<Self> {
|
||||||
|
let filter = Filter {
|
||||||
|
and: None,
|
||||||
|
prefix: rule.prefix.as_deref().map(Value::from),
|
||||||
|
size_gt: rule.size_gt.map(|x| IntValue(x as i64)),
|
||||||
|
size_lt: rule.size_lt.map(|x| IntValue(x as i64)),
|
||||||
|
};
|
||||||
|
match filter.count() {
|
||||||
|
0 => None,
|
||||||
|
1 => Some(filter),
|
||||||
|
_ => Some(Filter {
|
||||||
|
and: Some(Box::new(filter)),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Expiration {
|
||||||
|
pub fn validate_into_garage_lifecycle_expiration(
|
||||||
|
self,
|
||||||
|
) -> Result<GarageLifecycleExpiration, &'static str> {
|
||||||
|
match (self.days, self.at_date) {
|
||||||
|
(Some(_), Some(_)) => Err("cannot have both <Days> and <Date> in <Expiration>"),
|
||||||
|
(None, None) => Err("<Expiration> must contain either <Days> or <Date>"),
|
||||||
|
(Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)),
|
||||||
|
(None, Some(date)) => {
|
||||||
|
parse_lifecycle_date(&date.0)?;
|
||||||
|
Ok(GarageLifecycleExpiration::AtDate(date.0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_garage_lifecycle_expiration(exp: &GarageLifecycleExpiration) -> Self {
|
||||||
|
match exp {
|
||||||
|
GarageLifecycleExpiration::AfterDays(days) => Expiration {
|
||||||
|
days: Some(IntValue(*days as i64)),
|
||||||
|
at_date: None,
|
||||||
|
},
|
||||||
|
GarageLifecycleExpiration::AtDate(date) => Expiration {
|
||||||
|
days: None,
|
||||||
|
at_date: Some(Value(date.to_string())),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
use quick_xml::de::from_str;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deserialize_lifecycle_config() -> Result<(), Error> {
|
||||||
|
let message = r#"<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<LifecycleConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
|
||||||
|
<Rule>
|
||||||
|
<ID>id1</ID>
|
||||||
|
<Status>Enabled</Status>
|
||||||
|
<Filter>
|
||||||
|
<Prefix>documents/</Prefix>
|
||||||
|
</Filter>
|
||||||
|
<AbortIncompleteMultipartUpload>
|
||||||
|
<DaysAfterInitiation>7</DaysAfterInitiation>
|
||||||
|
</AbortIncompleteMultipartUpload>
|
||||||
|
</Rule>
|
||||||
|
<Rule>
|
||||||
|
<ID>id2</ID>
|
||||||
|
<Status>Enabled</Status>
|
||||||
|
<Filter>
|
||||||
|
<And>
|
||||||
|
<Prefix>logs/</Prefix>
|
||||||
|
<ObjectSizeGreaterThan>1000000</ObjectSizeGreaterThan>
|
||||||
|
</And>
|
||||||
|
</Filter>
|
||||||
|
<Expiration>
|
||||||
|
<Days>365</Days>
|
||||||
|
</Expiration>
|
||||||
|
</Rule>
|
||||||
|
</LifecycleConfiguration>"#;
|
||||||
|
let conf: LifecycleConfiguration = from_str(message).unwrap();
|
||||||
|
let ref_value = LifecycleConfiguration {
|
||||||
|
xmlns: (),
|
||||||
|
lifecycle_rules: vec![
|
||||||
|
LifecycleRule {
|
||||||
|
id: Some("id1".into()),
|
||||||
|
status: "Enabled".into(),
|
||||||
|
filter: Some(Filter {
|
||||||
|
prefix: Some("documents/".into()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
expiration: None,
|
||||||
|
abort_incomplete_mpu: Some(AbortIncompleteMpu { days: IntValue(7) }),
|
||||||
|
},
|
||||||
|
LifecycleRule {
|
||||||
|
id: Some("id2".into()),
|
||||||
|
status: "Enabled".into(),
|
||||||
|
filter: Some(Filter {
|
||||||
|
and: Some(Box::new(Filter {
|
||||||
|
prefix: Some("logs/".into()),
|
||||||
|
size_gt: Some(IntValue(1000000)),
|
||||||
|
..Default::default()
|
||||||
|
})),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
expiration: Some(Expiration {
|
||||||
|
days: Some(IntValue(365)),
|
||||||
|
at_date: None,
|
||||||
|
}),
|
||||||
|
abort_incomplete_mpu: None,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
assert_eq! {
|
||||||
|
ref_value,
|
||||||
|
conf
|
||||||
|
};
|
||||||
|
|
||||||
|
let message2 = to_xml_with_header(&ref_value)?;
|
||||||
|
|
||||||
|
let cleanup = |c: &str| c.replace(char::is_whitespace, "");
|
||||||
|
assert_eq!(cleanup(message), cleanup(&message2));
|
||||||
|
|
||||||
|
// Check validation
|
||||||
|
let validated = ref_value
|
||||||
|
.validate_into_garage_lifecycle_config()
|
||||||
|
.ok_or_bad_request("invalid xml config")?;
|
||||||
|
|
||||||
|
let ref_config = vec![
|
||||||
|
GarageLifecycleRule {
|
||||||
|
id: Some("id1".into()),
|
||||||
|
enabled: true,
|
||||||
|
filter: GarageLifecycleFilter {
|
||||||
|
prefix: Some("documents/".into()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
expiration: None,
|
||||||
|
abort_incomplete_mpu_days: Some(7),
|
||||||
|
},
|
||||||
|
GarageLifecycleRule {
|
||||||
|
id: Some("id2".into()),
|
||||||
|
enabled: true,
|
||||||
|
filter: GarageLifecycleFilter {
|
||||||
|
prefix: Some("logs/".into()),
|
||||||
|
size_gt: Some(1000000),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
expiration: Some(GarageLifecycleExpiration::AfterDays(365)),
|
||||||
|
abort_incomplete_mpu_days: None,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
assert_eq!(validated, ref_config);
|
||||||
|
|
||||||
|
let message3 = to_xml_with_header(&LifecycleConfiguration::from_garage_lifecycle_config(
|
||||||
|
&validated,
|
||||||
|
))?;
|
||||||
|
assert_eq!(cleanup(message), cleanup(&message3));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@ -6,6 +6,7 @@ mod copy;
|
|||||||
pub mod cors;
|
pub mod cors;
|
||||||
mod delete;
|
mod delete;
|
||||||
pub mod get;
|
pub mod get;
|
||||||
|
mod lifecycle;
|
||||||
mod list;
|
mod list;
|
||||||
mod multipart;
|
mod multipart;
|
||||||
mod post_object;
|
mod post_object;
|
||||||
|
@ -43,14 +43,11 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result<Response<Body>, Error
|
|||||||
|
|
||||||
pub async fn handle_delete_website(
|
pub async fn handle_delete_website(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket_id: Uuid,
|
mut bucket: Bucket,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let mut bucket = garage
|
let param = bucket
|
||||||
.bucket_helper()
|
.params_mut()
|
||||||
.get_existing_bucket(bucket_id)
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let param = bucket.params_mut().unwrap();
|
|
||||||
|
|
||||||
param.website_config.update(None);
|
param.website_config.update(None);
|
||||||
garage.bucket_table.insert(&bucket).await?;
|
garage.bucket_table.insert(&bucket).await?;
|
||||||
@ -62,7 +59,7 @@ pub async fn handle_delete_website(
|
|||||||
|
|
||||||
pub async fn handle_put_website(
|
pub async fn handle_put_website(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
bucket_id: Uuid,
|
mut bucket: Bucket,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
@ -72,12 +69,9 @@ pub async fn handle_put_website(
|
|||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut bucket = garage
|
let param = bucket
|
||||||
.bucket_helper()
|
.params_mut()
|
||||||
.get_existing_bucket(bucket_id)
|
.ok_or_internal_error("Bucket should not be deleted at this point")?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let param = bucket.params_mut().unwrap();
|
|
||||||
|
|
||||||
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
|
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
|
||||||
conf.validate()?;
|
conf.validate()?;
|
||||||
|
@ -23,6 +23,7 @@ garage_util.workspace = true
|
|||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
arc-swap = "1.0"
|
arc-swap = "1.0"
|
||||||
blake2 = "0.10"
|
blake2 = "0.10"
|
||||||
|
chrono = "0.4"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
base64 = "0.21"
|
base64 = "0.21"
|
||||||
|
@ -48,6 +48,9 @@ mod v08 {
|
|||||||
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
|
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
|
||||||
/// CORS rules
|
/// CORS rules
|
||||||
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
|
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
|
||||||
|
/// Lifecycle configuration
|
||||||
|
#[serde(default)]
|
||||||
|
pub lifecycle_config: crdt::Lww<Option<Vec<LifecycleRule>>>,
|
||||||
/// Bucket quotas
|
/// Bucket quotas
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub quotas: crdt::Lww<BucketQuotas>,
|
pub quotas: crdt::Lww<BucketQuotas>,
|
||||||
@ -69,6 +72,42 @@ mod v08 {
|
|||||||
pub expose_headers: Vec<String>,
|
pub expose_headers: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Lifecycle configuration rule
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct LifecycleRule {
|
||||||
|
/// The ID of the rule
|
||||||
|
pub id: Option<String>,
|
||||||
|
/// Whether the rule is active
|
||||||
|
pub enabled: bool,
|
||||||
|
/// The filter to check whether rule applies to a given object
|
||||||
|
pub filter: LifecycleFilter,
|
||||||
|
/// Number of days after which incomplete multipart uploads are aborted
|
||||||
|
pub abort_incomplete_mpu_days: Option<usize>,
|
||||||
|
/// Expiration policy for stored objects
|
||||||
|
pub expiration: Option<LifecycleExpiration>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A lifecycle filter is a set of conditions that must all be true.
|
||||||
|
/// For each condition, if it is None, it is not verified (always true),
|
||||||
|
/// and if it is Some(x), then it is verified for value x
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Default)]
|
||||||
|
pub struct LifecycleFilter {
|
||||||
|
/// If Some(x), object key has to start with prefix x
|
||||||
|
pub prefix: Option<String>,
|
||||||
|
/// If Some(x), object size has to be more than x
|
||||||
|
pub size_gt: Option<u64>,
|
||||||
|
/// If Some(x), object size has to be less than x
|
||||||
|
pub size_lt: Option<u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub enum LifecycleExpiration {
|
||||||
|
/// Objects expire x days after they were created
|
||||||
|
AfterDays(usize),
|
||||||
|
/// Objects expire at date x (must be in yyyy-mm-dd format)
|
||||||
|
AtDate(String),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct BucketQuotas {
|
pub struct BucketQuotas {
|
||||||
/// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
|
/// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
|
||||||
@ -88,7 +127,7 @@ impl AutoCrdt for BucketQuotas {
|
|||||||
|
|
||||||
impl BucketParams {
|
impl BucketParams {
|
||||||
/// Create an empty BucketParams with no authorized keys and no website accesss
|
/// Create an empty BucketParams with no authorized keys and no website accesss
|
||||||
pub fn new() -> Self {
|
fn new() -> Self {
|
||||||
BucketParams {
|
BucketParams {
|
||||||
creation_date: now_msec(),
|
creation_date: now_msec(),
|
||||||
authorized_keys: crdt::Map::new(),
|
authorized_keys: crdt::Map::new(),
|
||||||
@ -96,6 +135,7 @@ impl BucketParams {
|
|||||||
local_aliases: crdt::LwwMap::new(),
|
local_aliases: crdt::LwwMap::new(),
|
||||||
website_config: crdt::Lww::new(None),
|
website_config: crdt::Lww::new(None),
|
||||||
cors_config: crdt::Lww::new(None),
|
cors_config: crdt::Lww::new(None),
|
||||||
|
lifecycle_config: crdt::Lww::new(None),
|
||||||
quotas: crdt::Lww::new(BucketQuotas::default()),
|
quotas: crdt::Lww::new(BucketQuotas::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -111,10 +151,25 @@ impl Crdt for BucketParams {
|
|||||||
|
|
||||||
self.website_config.merge(&o.website_config);
|
self.website_config.merge(&o.website_config);
|
||||||
self.cors_config.merge(&o.cors_config);
|
self.cors_config.merge(&o.cors_config);
|
||||||
|
self.lifecycle_config.merge(&o.lifecycle_config);
|
||||||
self.quotas.merge(&o.quotas);
|
self.quotas.merge(&o.quotas);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn parse_lifecycle_date(date: &str) -> Result<chrono::NaiveDate, &'static str> {
|
||||||
|
use chrono::prelude::*;
|
||||||
|
|
||||||
|
if let Ok(datetime) = NaiveDateTime::parse_from_str(date, "%Y-%m-%dT%H:%M:%SZ") {
|
||||||
|
if datetime.time() == NaiveTime::MIN {
|
||||||
|
Ok(datetime.date())
|
||||||
|
} else {
|
||||||
|
Err("date must be at midnight")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
NaiveDate::parse_from_str(date, "%Y-%m-%d").map_err(|_| "date has invalid format")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Bucket {
|
impl Default for Bucket {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self::new()
|
Self::new()
|
||||||
|
@ -7,6 +7,7 @@ use garage_db as db;
|
|||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::config::*;
|
use garage_util::config::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::persister::PersisterShared;
|
||||||
|
|
||||||
use garage_rpc::replication_mode::ReplicationMode;
|
use garage_rpc::replication_mode::ReplicationMode;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
@ -17,6 +18,7 @@ use garage_table::replication::TableShardedReplication;
|
|||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use crate::s3::block_ref_table::*;
|
use crate::s3::block_ref_table::*;
|
||||||
|
use crate::s3::lifecycle_worker;
|
||||||
use crate::s3::mpu_table::*;
|
use crate::s3::mpu_table::*;
|
||||||
use crate::s3::object_table::*;
|
use crate::s3::object_table::*;
|
||||||
use crate::s3::version_table::*;
|
use crate::s3::version_table::*;
|
||||||
@ -67,6 +69,9 @@ pub struct Garage {
|
|||||||
/// Table containing S3 block references (not blocks themselves)
|
/// Table containing S3 block references (not blocks themselves)
|
||||||
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
|
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
|
||||||
|
|
||||||
|
/// Persister for lifecycle worker info
|
||||||
|
pub lifecycle_persister: PersisterShared<lifecycle_worker::LifecycleWorkerPersisted>,
|
||||||
|
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
pub k2v: GarageK2V,
|
pub k2v: GarageK2V,
|
||||||
}
|
}
|
||||||
@ -199,6 +204,9 @@ impl Garage {
|
|||||||
let replication_mode = ReplicationMode::parse(&config.replication_mode)
|
let replication_mode = ReplicationMode::parse(&config.replication_mode)
|
||||||
.ok_or_message("Invalid replication_mode in config file.")?;
|
.ok_or_message("Invalid replication_mode in config file.")?;
|
||||||
|
|
||||||
|
info!("Initialize background variable system...");
|
||||||
|
let mut bg_vars = vars::BgVars::new();
|
||||||
|
|
||||||
info!("Initialize membership management system...");
|
info!("Initialize membership management system...");
|
||||||
let system = System::new(network_key, replication_mode, &config)?;
|
let system = System::new(network_key, replication_mode, &config)?;
|
||||||
|
|
||||||
@ -230,6 +238,7 @@ impl Garage {
|
|||||||
data_rep_param,
|
data_rep_param,
|
||||||
system.clone(),
|
system.clone(),
|
||||||
);
|
);
|
||||||
|
block_manager.register_bg_vars(&mut bg_vars);
|
||||||
|
|
||||||
// ---- admin tables ----
|
// ---- admin tables ----
|
||||||
info!("Initialize bucket_table...");
|
info!("Initialize bucket_table...");
|
||||||
@ -296,14 +305,15 @@ impl Garage {
|
|||||||
&db,
|
&db,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
info!("Load lifecycle worker state...");
|
||||||
|
let lifecycle_persister =
|
||||||
|
PersisterShared::new(&system.metadata_dir, "lifecycle_worker_state");
|
||||||
|
lifecycle_worker::register_bg_vars(&lifecycle_persister, &mut bg_vars);
|
||||||
|
|
||||||
// ---- K2V ----
|
// ---- K2V ----
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
||||||
|
|
||||||
// Initialize bg vars
|
|
||||||
let mut bg_vars = vars::BgVars::new();
|
|
||||||
block_manager.register_bg_vars(&mut bg_vars);
|
|
||||||
|
|
||||||
// -- done --
|
// -- done --
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
config,
|
config,
|
||||||
@ -321,12 +331,13 @@ impl Garage {
|
|||||||
mpu_counter_table,
|
mpu_counter_table,
|
||||||
version_table,
|
version_table,
|
||||||
block_ref_table,
|
block_ref_table,
|
||||||
|
lifecycle_persister,
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
k2v,
|
k2v,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_workers(&self, bg: &BackgroundRunner) {
|
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
|
||||||
self.block_manager.spawn_workers(bg);
|
self.block_manager.spawn_workers(bg);
|
||||||
|
|
||||||
self.bucket_table.spawn_workers(bg);
|
self.bucket_table.spawn_workers(bg);
|
||||||
@ -340,6 +351,11 @@ impl Garage {
|
|||||||
self.version_table.spawn_workers(bg);
|
self.version_table.spawn_workers(bg);
|
||||||
self.block_ref_table.spawn_workers(bg);
|
self.block_ref_table.spawn_workers(bg);
|
||||||
|
|
||||||
|
bg.spawn_worker(lifecycle_worker::LifecycleWorker::new(
|
||||||
|
self.clone(),
|
||||||
|
self.lifecycle_persister.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
self.k2v.spawn_workers(bg);
|
self.k2v.spawn_workers(bg);
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,7 @@ impl Migrate {
|
|||||||
local_aliases: LwwMap::new(),
|
local_aliases: LwwMap::new(),
|
||||||
website_config: Lww::new(website),
|
website_config: Lww::new(website),
|
||||||
cors_config: Lww::new(None),
|
cors_config: Lww::new(None),
|
||||||
|
lifecycle_config: Lww::new(None),
|
||||||
quotas: Lww::new(Default::default()),
|
quotas: Lww::new(Default::default()),
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
|
413
src/model/s3/lifecycle_worker.rs
Normal file
413
src/model/s3/lifecycle_worker.rs
Normal file
@ -0,0 +1,413 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use chrono::prelude::*;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
|
use garage_util::data::*;
|
||||||
|
use garage_util::error::Error;
|
||||||
|
use garage_util::persister::PersisterShared;
|
||||||
|
use garage_util::time::*;
|
||||||
|
|
||||||
|
use garage_table::EmptyKey;
|
||||||
|
|
||||||
|
use crate::bucket_table::*;
|
||||||
|
use crate::s3::object_table::*;
|
||||||
|
|
||||||
|
use crate::garage::Garage;
|
||||||
|
|
||||||
|
mod v090 {
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Default, Clone)]
|
||||||
|
pub struct LifecycleWorkerPersisted {
|
||||||
|
pub last_completed: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::InitialFormat for LifecycleWorkerPersisted {
|
||||||
|
const VERSION_MARKER: &'static [u8] = b"G09lwp";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use v090::*;
|
||||||
|
|
||||||
|
pub struct LifecycleWorker {
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
|
||||||
|
state: State,
|
||||||
|
|
||||||
|
persister: PersisterShared<LifecycleWorkerPersisted>,
|
||||||
|
}
|
||||||
|
|
||||||
|
enum State {
|
||||||
|
Completed(NaiveDate),
|
||||||
|
Running {
|
||||||
|
date: NaiveDate,
|
||||||
|
pos: Vec<u8>,
|
||||||
|
counter: usize,
|
||||||
|
objects_expired: usize,
|
||||||
|
mpu_aborted: usize,
|
||||||
|
last_bucket: Option<Bucket>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||||
|
enum Skip {
|
||||||
|
SkipBucket,
|
||||||
|
NextObject,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_bg_vars(
|
||||||
|
persister: &PersisterShared<LifecycleWorkerPersisted>,
|
||||||
|
vars: &mut vars::BgVars,
|
||||||
|
) {
|
||||||
|
vars.register_ro(persister, "lifecycle-last-completed", |p| {
|
||||||
|
p.get_with(|x| x.last_completed.clone().unwrap_or("never".to_string()))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LifecycleWorker {
|
||||||
|
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
|
||||||
|
let today = today();
|
||||||
|
let last_completed = persister.get_with(|x| {
|
||||||
|
x.last_completed
|
||||||
|
.as_deref()
|
||||||
|
.and_then(|x| x.parse::<NaiveDate>().ok())
|
||||||
|
});
|
||||||
|
let state = match last_completed {
|
||||||
|
Some(d) if d >= today => State::Completed(d),
|
||||||
|
_ => State::start(today),
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
garage,
|
||||||
|
state,
|
||||||
|
persister,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl State {
|
||||||
|
fn start(date: NaiveDate) -> Self {
|
||||||
|
info!("Starting lifecycle worker for {}", date);
|
||||||
|
State::Running {
|
||||||
|
date,
|
||||||
|
pos: vec![],
|
||||||
|
counter: 0,
|
||||||
|
objects_expired: 0,
|
||||||
|
mpu_aborted: 0,
|
||||||
|
last_bucket: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Worker for LifecycleWorker {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"object lifecycle worker".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status(&self) -> WorkerStatus {
|
||||||
|
match &self.state {
|
||||||
|
State::Completed(d) => WorkerStatus {
|
||||||
|
freeform: vec![format!("Last completed: {}", d)],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
State::Running {
|
||||||
|
date,
|
||||||
|
counter,
|
||||||
|
objects_expired,
|
||||||
|
mpu_aborted,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
let n_objects = self
|
||||||
|
.garage
|
||||||
|
.object_table
|
||||||
|
.data
|
||||||
|
.store
|
||||||
|
.fast_len()
|
||||||
|
.unwrap_or(None);
|
||||||
|
let progress = match n_objects {
|
||||||
|
None => "...".to_string(),
|
||||||
|
Some(total) => format!(
|
||||||
|
"~{:.2}%",
|
||||||
|
100. * std::cmp::min(*counter, total) as f32 / total as f32
|
||||||
|
),
|
||||||
|
};
|
||||||
|
WorkerStatus {
|
||||||
|
progress: Some(progress),
|
||||||
|
freeform: vec![
|
||||||
|
format!("Started: {}", date),
|
||||||
|
format!("Objects expired: {}", objects_expired),
|
||||||
|
format!("Multipart uploads aborted: { }", mpu_aborted),
|
||||||
|
],
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||||||
|
match &mut self.state {
|
||||||
|
State::Completed(_) => Ok(WorkerState::Idle),
|
||||||
|
State::Running {
|
||||||
|
date,
|
||||||
|
counter,
|
||||||
|
objects_expired,
|
||||||
|
mpu_aborted,
|
||||||
|
pos,
|
||||||
|
last_bucket,
|
||||||
|
} => {
|
||||||
|
// Process a batch of 100 items before yielding to bg task scheduler
|
||||||
|
for _ in 0..100 {
|
||||||
|
let (object_bytes, next_pos) = match self
|
||||||
|
.garage
|
||||||
|
.object_table
|
||||||
|
.data
|
||||||
|
.store
|
||||||
|
.get_gt(&pos)?
|
||||||
|
{
|
||||||
|
None => {
|
||||||
|
info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
|
||||||
|
self.persister
|
||||||
|
.set_with(|x| x.last_completed = Some(date.to_string()))?;
|
||||||
|
self.state = State::Completed(*date);
|
||||||
|
return Ok(WorkerState::Idle);
|
||||||
|
}
|
||||||
|
Some((k, v)) => (v, k),
|
||||||
|
};
|
||||||
|
|
||||||
|
let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
|
||||||
|
let skip = process_object(
|
||||||
|
&self.garage,
|
||||||
|
*date,
|
||||||
|
&object,
|
||||||
|
objects_expired,
|
||||||
|
mpu_aborted,
|
||||||
|
last_bucket,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
*counter += 1;
|
||||||
|
if skip == Skip::SkipBucket {
|
||||||
|
let bucket_id_len = object.bucket_id.as_slice().len();
|
||||||
|
assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
|
||||||
|
*pos = std::cmp::max(
|
||||||
|
next_pos,
|
||||||
|
[&pos[..bucket_id_len], &[0xFFu8][..]].concat(),
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
*pos = next_pos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(WorkerState::Busy)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_work(&mut self) -> WorkerState {
|
||||||
|
match &self.state {
|
||||||
|
State::Completed(d) => {
|
||||||
|
let next_day = d.succ_opt().expect("no next day");
|
||||||
|
let next_start = midnight_ts(next_day);
|
||||||
|
loop {
|
||||||
|
let now = now_msec();
|
||||||
|
if now < next_start {
|
||||||
|
tokio::time::sleep_until(
|
||||||
|
(Instant::now() + Duration::from_millis(next_start - now)).into(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.state = State::start(std::cmp::max(next_day, today()));
|
||||||
|
}
|
||||||
|
State::Running { .. } => (),
|
||||||
|
}
|
||||||
|
WorkerState::Busy
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_object(
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
now_date: NaiveDate,
|
||||||
|
object: &Object,
|
||||||
|
objects_expired: &mut usize,
|
||||||
|
mpu_aborted: &mut usize,
|
||||||
|
last_bucket: &mut Option<Bucket>,
|
||||||
|
) -> Result<Skip, Error> {
|
||||||
|
if !object
|
||||||
|
.versions()
|
||||||
|
.iter()
|
||||||
|
.any(|x| x.is_data() || x.is_uploading(None))
|
||||||
|
{
|
||||||
|
return Ok(Skip::NextObject);
|
||||||
|
}
|
||||||
|
|
||||||
|
let bucket = match last_bucket.take() {
|
||||||
|
Some(b) if b.id == object.bucket_id => b,
|
||||||
|
_ => {
|
||||||
|
match garage
|
||||||
|
.bucket_table
|
||||||
|
.get(&EmptyKey, &object.bucket_id)
|
||||||
|
.await?
|
||||||
|
{
|
||||||
|
Some(b) => b,
|
||||||
|
None => {
|
||||||
|
warn!(
|
||||||
|
"Lifecycle worker: object in non-existent bucket {:?}",
|
||||||
|
object.bucket_id
|
||||||
|
);
|
||||||
|
return Ok(Skip::SkipBucket);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let lifecycle_policy: &[LifecycleRule] = bucket
|
||||||
|
.state
|
||||||
|
.as_option()
|
||||||
|
.and_then(|s| s.lifecycle_config.get().as_deref())
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
if lifecycle_policy.iter().all(|x| !x.enabled) {
|
||||||
|
return Ok(Skip::SkipBucket);
|
||||||
|
}
|
||||||
|
|
||||||
|
let db = garage.object_table.data.store.db();
|
||||||
|
|
||||||
|
for rule in lifecycle_policy.iter() {
|
||||||
|
if !rule.enabled {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(pfx) = &rule.filter.prefix {
|
||||||
|
if !object.key.starts_with(pfx) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(expire) = &rule.expiration {
|
||||||
|
if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) {
|
||||||
|
let version_date = next_date(current_version.timestamp);
|
||||||
|
|
||||||
|
let current_version_data = match ¤t_version.state {
|
||||||
|
ObjectVersionState::Complete(c) => c,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let size_match = check_size_filter(current_version_data, &rule.filter);
|
||||||
|
let date_match = match expire {
|
||||||
|
LifecycleExpiration::AfterDays(n_days) => {
|
||||||
|
(now_date - version_date) >= chrono::Duration::days(*n_days as i64)
|
||||||
|
}
|
||||||
|
LifecycleExpiration::AtDate(exp_date) => {
|
||||||
|
if let Ok(exp_date) = parse_lifecycle_date(exp_date) {
|
||||||
|
now_date >= exp_date
|
||||||
|
} else {
|
||||||
|
warn!("Invalid expiration date stored in bucket {:?} lifecycle config: {}", bucket.id, exp_date);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if size_match && date_match {
|
||||||
|
// Delete expired version
|
||||||
|
let deleted_object = Object::new(
|
||||||
|
object.bucket_id,
|
||||||
|
object.key.clone(),
|
||||||
|
vec![ObjectVersion {
|
||||||
|
uuid: gen_uuid(),
|
||||||
|
timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1),
|
||||||
|
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
||||||
|
}],
|
||||||
|
);
|
||||||
|
info!(
|
||||||
|
"Lifecycle: expiring 1 object in bucket {:?}",
|
||||||
|
object.bucket_id
|
||||||
|
);
|
||||||
|
db.transaction(|mut tx| {
|
||||||
|
garage.object_table.queue_insert(&mut tx, &deleted_object)
|
||||||
|
})?;
|
||||||
|
*objects_expired += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days {
|
||||||
|
let aborted_versions = object
|
||||||
|
.versions()
|
||||||
|
.iter()
|
||||||
|
.filter_map(|v| {
|
||||||
|
let version_date = next_date(v.timestamp);
|
||||||
|
if (now_date - version_date) >= chrono::Duration::days(*abort_mpu_days as i64)
|
||||||
|
&& matches!(&v.state, ObjectVersionState::Uploading { .. })
|
||||||
|
{
|
||||||
|
Some(ObjectVersion {
|
||||||
|
state: ObjectVersionState::Aborted,
|
||||||
|
..*v
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if !aborted_versions.is_empty() {
|
||||||
|
// Insert aborted mpu info
|
||||||
|
let n_aborted = aborted_versions.len();
|
||||||
|
info!(
|
||||||
|
"Lifecycle: aborting {} incomplete upload(s) in bucket {:?}",
|
||||||
|
n_aborted, object.bucket_id
|
||||||
|
);
|
||||||
|
let aborted_object =
|
||||||
|
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
|
||||||
|
db.transaction(|mut tx| {
|
||||||
|
garage.object_table.queue_insert(&mut tx, &aborted_object)
|
||||||
|
})?;
|
||||||
|
*mpu_aborted += n_aborted;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*last_bucket = Some(bucket);
|
||||||
|
Ok(Skip::NextObject)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {
|
||||||
|
let size = match version_data {
|
||||||
|
ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size,
|
||||||
|
_ => unreachable!(),
|
||||||
|
};
|
||||||
|
if let Some(size_gt) = filter.size_gt {
|
||||||
|
if !(size > size_gt) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(size_lt) = filter.size_lt {
|
||||||
|
if !(size < size_lt) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn midnight_ts(date: NaiveDate) -> u64 {
|
||||||
|
date.and_hms_opt(0, 0, 0)
|
||||||
|
.expect("midnight does not exist")
|
||||||
|
.timestamp_millis() as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next_date(ts: u64) -> NaiveDate {
|
||||||
|
NaiveDateTime::from_timestamp_millis(ts as i64)
|
||||||
|
.expect("bad timestamp")
|
||||||
|
.date()
|
||||||
|
.succ_opt()
|
||||||
|
.expect("no next day")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn today() -> NaiveDate {
|
||||||
|
Utc::now().naive_utc().date()
|
||||||
|
}
|
@ -2,3 +2,5 @@ pub mod block_ref_table;
|
|||||||
pub mod mpu_table;
|
pub mod mpu_table;
|
||||||
pub mod object_table;
|
pub mod object_table;
|
||||||
pub mod version_table;
|
pub mod version_table;
|
||||||
|
|
||||||
|
pub mod lifecycle_worker;
|
||||||
|
Loading…
Reference in New Issue
Block a user