garage_api: Handle streaming payload early in request handling

This commit is contained in:
KokaKiwi 2022-02-18 17:05:19 +01:00 committed by Gitea
parent 822128e3c8
commit 98545a16dd
3 changed files with 61 additions and 57 deletions

View File

@ -1,7 +1,9 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::future::Future; use futures::future::Future;
use futures::prelude::*;
use hyper::header; use hyper::header;
use hyper::server::conn::AddrStream; use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
@ -24,7 +26,10 @@ use garage_model::key_table::Key;
use garage_table::util::*; use garage_table::util::*;
use crate::error::*; use crate::error::*;
use crate::signature::compute_scope;
use crate::signature::payload::check_payload_signature; use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::SignedPayloadStream;
use crate::signature::LONG_DATETIME;
use crate::helpers::*; use crate::helpers::*;
use crate::s3_bucket::*; use crate::s3_bucket::*;
@ -158,7 +163,7 @@ async fn handler_stage2(
let authority = req let authority = req
.headers() .headers()
.get(header::HOST) .get(header::HOST)
.ok_or_else(|| Error::BadRequest("HOST header required".to_owned()))? .ok_or_bad_request("Host header required")?
.to_str()?; .to_str()?;
let host = authority_to_host(authority)?; let host = authority_to_host(authority)?;
@ -221,11 +226,57 @@ async fn handler_stage3(
return handle_options_s3api(garage, &req, bucket_name).await; return handle_options_s3api(garage, &req, bucket_name).await;
} }
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?; let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| { let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string()) Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?; })?;
let req = match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = req
.headers()
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
let signing_hmac = crate::signature::signing_hmac(
&date,
secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
req.map(move |body| {
Body::wrap_stream(
SignedPayloadStream::new(
body.map_err(Error::from),
signing_hmac,
date,
&scope,
signature,
)
.map_err(Error::from),
)
})
}
_ => req,
};
let bucket_name = match bucket_name { let bucket_name = match bucket_name {
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await, None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
Some(bucket) => bucket.to_string(), Some(bucket) => bucket.to_string(),
@ -307,7 +358,7 @@ async fn handler_stage3(
.await .await
} }
Endpoint::PutObject { key } => { Endpoint::PutObject { key } => {
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await handle_put(garage, req, bucket_id, &key, content_sha256).await
} }
Endpoint::AbortMultipartUpload { key, upload_id } => { Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await

View File

@ -1,8 +1,7 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc}; use futures::prelude::*;
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes}; use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue}; use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response}; use hyper::{Request, Response};
@ -17,23 +16,19 @@ use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD; use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*; use garage_model::block_ref_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::object_table::*; use garage_model::object_table::*;
use garage_model::version_table::*; use garage_model::version_table::*;
use crate::error::*; use crate::error::*;
use crate::s3_xml; use crate::s3_xml;
use crate::signature::streaming::SignedPayloadStream; use crate::signature::verify_signed_content;
use crate::signature::LONG_DATETIME;
use crate::signature::{compute_scope, verify_signed_content};
pub async fn handle_put( pub async fn handle_put(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<Body>,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &str,
api_key: &Key, content_sha256: Option<Hash>,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
// Retrieve interesting headers from request // Retrieve interesting headers from request
let headers = get_headers(req.headers())?; let headers = get_headers(req.headers())?;
@ -43,52 +38,10 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
None => None, None => None,
}; };
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let content_sha256 = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
Some(content_sha256)
}
_ => None,
};
// Parse body of uploaded file let (_head, body) = req.into_parts();
let (head, body) = req.into_parts();
let body = body.map_err(Error::from); let body = body.map_err(Error::from);
let body = if let Some(signature) = payload_seed_signature {
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = head
.headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
let signing_hmac = crate::signature::signing_hmac(
&date,
secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
.map_err(Error::from)
.boxed()
} else {
body.boxed()
};
save_stream( save_stream(
garage, garage,
headers, headers,

View File

@ -164,15 +164,15 @@ where
datetime: DateTime<Utc>, datetime: DateTime<Utc>,
scope: &str, scope: &str,
seed_signature: Hash, seed_signature: Hash,
) -> Result<Self, Error> { ) -> Self {
Ok(Self { Self {
stream, stream,
buf: bytes::BytesMut::new(), buf: bytes::BytesMut::new(),
datetime, datetime,
scope: scope.into(), scope: scope.into(),
signing_hmac, signing_hmac,
previous_signature: seed_signature, previous_signature: seed_signature,
}) }
} }
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> { fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {