Fix clippy lints (fix #121)
This commit is contained in:
parent
b2c51844a1
commit
ada7899b24
@ -36,7 +36,7 @@ steps:
|
|||||||
path: /etc/nix
|
path: /etc/nix
|
||||||
commands:
|
commands:
|
||||||
- nix-shell --arg release false --run "cargo fmt -- --check"
|
- nix-shell --arg release false --run "cargo fmt -- --check"
|
||||||
- nix-shell --arg release false --run "cargo clippy -- --allow clippy::needless_borrow --allow clippy::needless-return --deny warnings"
|
- nix-shell --arg release false --run "cargo clippy -- --deny warnings"
|
||||||
|
|
||||||
- name: build
|
- name: build
|
||||||
image: nixpkgs/nix:nixos-21.05
|
image: nixpkgs/nix:nixos-21.05
|
||||||
|
@ -39,7 +39,7 @@ pub async fn run_api_server(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let server = Server::bind(&addr).serve(service);
|
let server = Server::bind(addr).serve(service);
|
||||||
|
|
||||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
||||||
info!("API server listening on http://{}", addr);
|
info!("API server listening on http://{}", addr);
|
||||||
@ -88,8 +88,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||||||
|
|
||||||
let (bucket, key) = parse_bucket_key(&path)?;
|
let (bucket, key) = parse_bucket_key(&path)?;
|
||||||
let allowed = match req.method() {
|
let allowed = match req.method() {
|
||||||
&Method::HEAD | &Method::GET => api_key.allow_read(&bucket),
|
&Method::HEAD | &Method::GET => api_key.allow_read(bucket),
|
||||||
_ => api_key.allow_write(&bucket),
|
_ => api_key.allow_write(bucket),
|
||||||
};
|
};
|
||||||
if !allowed {
|
if !allowed {
|
||||||
return Err(Error::Forbidden(
|
return Err(Error::Forbidden(
|
||||||
@ -109,11 +109,11 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||||||
match *req.method() {
|
match *req.method() {
|
||||||
Method::HEAD => {
|
Method::HEAD => {
|
||||||
// HeadObject query
|
// HeadObject query
|
||||||
Ok(handle_head(garage, &req, &bucket, &key).await?)
|
Ok(handle_head(garage, &req, bucket, key).await?)
|
||||||
}
|
}
|
||||||
Method::GET => {
|
Method::GET => {
|
||||||
// GetObject query
|
// GetObject query
|
||||||
Ok(handle_get(garage, &req, &bucket, &key).await?)
|
Ok(handle_get(garage, &req, bucket, key).await?)
|
||||||
}
|
}
|
||||||
Method::PUT => {
|
Method::PUT => {
|
||||||
if params.contains_key(&"partnumber".to_string())
|
if params.contains_key(&"partnumber".to_string())
|
||||||
@ -125,8 +125,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||||||
Ok(handle_put_part(
|
Ok(handle_put_part(
|
||||||
garage,
|
garage,
|
||||||
req,
|
req,
|
||||||
&bucket,
|
bucket,
|
||||||
&key,
|
key,
|
||||||
part_number,
|
part_number,
|
||||||
upload_id,
|
upload_id,
|
||||||
content_sha256,
|
content_sha256,
|
||||||
@ -136,46 +136,43 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||||||
// CopyObject query
|
// CopyObject query
|
||||||
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
||||||
let copy_source =
|
let copy_source =
|
||||||
percent_encoding::percent_decode_str(©_source).decode_utf8()?;
|
percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
||||||
let (source_bucket, source_key) = parse_bucket_key(©_source)?;
|
let (source_bucket, source_key) = parse_bucket_key(©_source)?;
|
||||||
if !api_key.allow_read(&source_bucket) {
|
if !api_key.allow_read(source_bucket) {
|
||||||
return Err(Error::Forbidden(format!(
|
return Err(Error::Forbidden(format!(
|
||||||
"Reading from bucket {} not allowed for this key",
|
"Reading from bucket {} not allowed for this key",
|
||||||
source_bucket
|
source_bucket
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
let source_key = source_key.ok_or_bad_request("No source key specified")?;
|
let source_key = source_key.ok_or_bad_request("No source key specified")?;
|
||||||
Ok(
|
Ok(handle_copy(garage, &req, bucket, key, source_bucket, source_key).await?)
|
||||||
handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key)
|
|
||||||
.await?,
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
// PutObject query
|
// PutObject query
|
||||||
Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?)
|
Ok(handle_put(garage, req, bucket, key, content_sha256).await?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Method::DELETE => {
|
Method::DELETE => {
|
||||||
if params.contains_key(&"uploadid".to_string()) {
|
if params.contains_key(&"uploadid".to_string()) {
|
||||||
// AbortMultipartUpload query
|
// AbortMultipartUpload query
|
||||||
let upload_id = params.get("uploadid").unwrap();
|
let upload_id = params.get("uploadid").unwrap();
|
||||||
Ok(handle_abort_multipart_upload(garage, &bucket, &key, upload_id).await?)
|
Ok(handle_abort_multipart_upload(garage, bucket, key, upload_id).await?)
|
||||||
} else {
|
} else {
|
||||||
// DeleteObject query
|
// DeleteObject query
|
||||||
Ok(handle_delete(garage, &bucket, &key).await?)
|
Ok(handle_delete(garage, bucket, key).await?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Method::POST => {
|
Method::POST => {
|
||||||
if params.contains_key(&"uploads".to_string()) {
|
if params.contains_key(&"uploads".to_string()) {
|
||||||
// CreateMultipartUpload call
|
// CreateMultipartUpload call
|
||||||
Ok(handle_create_multipart_upload(garage, &req, &bucket, &key).await?)
|
Ok(handle_create_multipart_upload(garage, &req, bucket, key).await?)
|
||||||
} else if params.contains_key(&"uploadid".to_string()) {
|
} else if params.contains_key(&"uploadid".to_string()) {
|
||||||
// CompleteMultipartUpload call
|
// CompleteMultipartUpload call
|
||||||
let upload_id = params.get("uploadid").unwrap();
|
let upload_id = params.get("uploadid").unwrap();
|
||||||
Ok(handle_complete_multipart_upload(
|
Ok(handle_complete_multipart_upload(
|
||||||
garage,
|
garage,
|
||||||
req,
|
req,
|
||||||
&bucket,
|
bucket,
|
||||||
&key,
|
key,
|
||||||
upload_id,
|
upload_id,
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)
|
)
|
||||||
|
@ -55,7 +55,8 @@ async fn handle_delete_internal(
|
|||||||
);
|
);
|
||||||
|
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
return Ok((deleted_version, version_uuid));
|
|
||||||
|
Ok((deleted_version, version_uuid))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete(
|
pub async fn handle_delete(
|
||||||
@ -82,7 +83,7 @@ pub async fn handle_delete_objects(
|
|||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
|
||||||
let cmd_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
||||||
|
|
||||||
let mut ret_deleted = Vec::new();
|
let mut ret_deleted = Vec::new();
|
||||||
|
@ -106,12 +106,12 @@ pub async fn handle_head(
|
|||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(cached) = try_answer_cached(&version, version_meta, req) {
|
if let Some(cached) = try_answer_cached(version, version_meta, req) {
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body: Body = Body::empty();
|
let body: Body = Body::empty();
|
||||||
let response = object_headers(&version, version_meta)
|
let response = object_headers(version, version_meta)
|
||||||
.header("Content-Length", format!("{}", version_meta.size))
|
.header("Content-Length", format!("{}", version_meta.size))
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
.body(body)
|
.body(body)
|
||||||
@ -149,7 +149,7 @@ pub async fn handle_get(
|
|||||||
ObjectVersionData::FirstBlock(meta, _) => meta,
|
ObjectVersionData::FirstBlock(meta, _) => meta,
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some(cached) = try_answer_cached(&last_v, last_v_meta, req) {
|
if let Some(cached) = try_answer_cached(last_v, last_v_meta, req) {
|
||||||
return Ok(cached);
|
return Ok(cached);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,7 +179,7 @@ pub async fn handle_get(
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp_builder = object_headers(&last_v, last_v_meta)
|
let resp_builder = object_headers(last_v, last_v_meta)
|
||||||
.header("Content-Length", format!("{}", last_v_meta.size))
|
.header("Content-Length", format!("{}", last_v_meta.size))
|
||||||
.status(StatusCode::OK);
|
.status(StatusCode::OK);
|
||||||
|
|
||||||
@ -190,7 +190,7 @@ pub async fn handle_get(
|
|||||||
Ok(resp_builder.body(body)?)
|
Ok(resp_builder.body(body)?)
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
||||||
let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
|
let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
|
||||||
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
|
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
|
||||||
|
|
||||||
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
|
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
|
||||||
|
@ -193,8 +193,8 @@ async fn read_and_put_blocks(
|
|||||||
|
|
||||||
let mut next_offset = first_block.len();
|
let mut next_offset = first_block.len();
|
||||||
let mut put_curr_version_block = put_block_meta(
|
let mut put_curr_version_block = put_block_meta(
|
||||||
&garage,
|
garage,
|
||||||
&version,
|
version,
|
||||||
part_number,
|
part_number,
|
||||||
0,
|
0,
|
||||||
first_block_hash,
|
first_block_hash,
|
||||||
@ -213,8 +213,8 @@ async fn read_and_put_blocks(
|
|||||||
let block_hash = blake2sum(&block[..]);
|
let block_hash = blake2sum(&block[..]);
|
||||||
let block_len = block.len();
|
let block_len = block.len();
|
||||||
put_curr_version_block = put_block_meta(
|
put_curr_version_block = put_block_meta(
|
||||||
&garage,
|
garage,
|
||||||
&version,
|
version,
|
||||||
part_number,
|
part_number,
|
||||||
next_offset as u64,
|
next_offset as u64,
|
||||||
block_hash,
|
block_hash,
|
||||||
@ -437,7 +437,7 @@ pub async fn handle_complete_multipart_upload(
|
|||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
|
||||||
let body_xml = roxmltree::Document::parse(&std::str::from_utf8(&body)?)?;
|
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
||||||
.ok_or_bad_request("Invalid CompleteMultipartUpload XML")?;
|
.ok_or_bad_request("Invalid CompleteMultipartUpload XML")?;
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -70,7 +70,7 @@ pub async fn check_signature(
|
|||||||
let canonical_request = canonical_request(
|
let canonical_request = canonical_request(
|
||||||
request.method(),
|
request.method(),
|
||||||
&request.uri().path().to_string(),
|
&request.uri().path().to_string(),
|
||||||
&canonical_query_string(&request.uri()),
|
&canonical_query_string(request.uri()),
|
||||||
&headers,
|
&headers,
|
||||||
&authorization.signed_headers,
|
&authorization.signed_headers,
|
||||||
&authorization.content_sha256,
|
&authorization.content_sha256,
|
||||||
@ -252,7 +252,7 @@ fn canonical_request(
|
|||||||
method.as_str(),
|
method.as_str(),
|
||||||
url_path,
|
url_path,
|
||||||
canonical_query_string,
|
canonical_query_string,
|
||||||
&canonical_header_string(&headers, signed_headers),
|
&canonical_header_string(headers, signed_headers),
|
||||||
"",
|
"",
|
||||||
signed_headers,
|
signed_headers,
|
||||||
content_sha256,
|
content_sha256,
|
||||||
|
@ -54,7 +54,7 @@ pub fn format_table(data: Vec<String>) {
|
|||||||
out.push_str(col);
|
out.push_str(col);
|
||||||
(0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
|
(0..col_len - col.chars().count() + 2).for_each(|_| out.push(' '));
|
||||||
}
|
}
|
||||||
out.push_str(&row[row.len() - 1]);
|
out.push_str(row[row.len() - 1]);
|
||||||
out.push('\n');
|
out.push('\n');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ impl BlockManager {
|
|||||||
|
|
||||||
/// Ask nodes that might have a block for it
|
/// Ask nodes that might have a block for it
|
||||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||||
let who = self.replication.read_nodes(&hash);
|
let who = self.replication.read_nodes(hash);
|
||||||
let resps = self
|
let resps = self
|
||||||
.system
|
.system
|
||||||
.rpc
|
.rpc
|
||||||
@ -224,7 +224,7 @@ impl BlockManager {
|
|||||||
})?;
|
})?;
|
||||||
let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
|
let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
|
||||||
if old_rc == 0 {
|
if old_rc == 0 {
|
||||||
self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
|
self.put_to_resync(hash, BLOCK_RW_TIMEOUT)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -240,7 +240,7 @@ impl BlockManager {
|
|||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
if new_rc.is_none() {
|
if new_rc.is_none() {
|
||||||
self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
|
self.put_to_resync(hash, BLOCK_GC_TIMEOUT)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -406,7 +406,7 @@ impl BlockManager {
|
|||||||
if exists && !needed {
|
if exists && !needed {
|
||||||
trace!("Offloading block {:?}", hash);
|
trace!("Offloading block {:?}", hash);
|
||||||
|
|
||||||
let mut who = self.replication.write_nodes(&hash);
|
let mut who = self.replication.write_nodes(hash);
|
||||||
if who.len() < self.replication.write_quorum() {
|
if who.len() < self.replication.write_quorum() {
|
||||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||||
}
|
}
|
||||||
@ -478,7 +478,7 @@ impl BlockManager {
|
|||||||
// TODO find a way to not do this if they are sending it to us
|
// TODO find a way to not do this if they are sending it to us
|
||||||
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
|
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
|
||||||
// between the RC being incremented and this part being called.
|
// between the RC being incremented and this part being called.
|
||||||
let block_data = self.rpc_get_block(&hash).await?;
|
let block_data = self.rpc_get_block(hash).await?;
|
||||||
self.write_block(hash, &block_data[..]).await?;
|
self.write_block(hash, &block_data[..]).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -600,7 +600,7 @@ impl BlockManagerLocked {
|
|||||||
let mut path2 = path.clone();
|
let mut path2 = path.clone();
|
||||||
path2.set_extension(".corrupted");
|
path2.set_extension(".corrupted");
|
||||||
fs::rename(path, path2).await?;
|
fs::rename(path, path2).await?;
|
||||||
mgr.put_to_resync(&hash, Duration::from_millis(0))?;
|
mgr.put_to_resync(hash, Duration::from_millis(0))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -543,7 +543,7 @@ impl EndpointHandler<SystemRpc> for System {
|
|||||||
SystemRpc::Connect(node) => self.handle_connect(node).await,
|
SystemRpc::Connect(node) => self.handle_connect(node).await,
|
||||||
SystemRpc::PullConfig => Ok(self.handle_pull_config()),
|
SystemRpc::PullConfig => Ok(self.handle_pull_config()),
|
||||||
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
|
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
|
||||||
SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await,
|
SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(adv).await,
|
||||||
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
|
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
|
||||||
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
|
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ where
|
|||||||
}
|
}
|
||||||
/// Get a reference to the value assigned to a key
|
/// Get a reference to the value assigned to a key
|
||||||
pub fn get(&self, k: &K) -> Option<&V> {
|
pub fn get(&self, k: &K) -> Option<&V> {
|
||||||
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
|
||||||
Ok(i) => Some(&self.vals[i].2),
|
Ok(i) => Some(&self.vals[i].2),
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
@ -132,14 +132,14 @@ where
|
|||||||
{
|
{
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
for (k, ts2, v2) in other.vals.iter() {
|
for (k, ts2, v2) in other.vals.iter() {
|
||||||
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) {
|
||||||
Ok(i) => {
|
Ok(i) => {
|
||||||
let (_, ts1, _v1) = &self.vals[i];
|
let (_, ts1, _v1) = &self.vals[i];
|
||||||
if ts2 > ts1 {
|
if ts2 > ts1 {
|
||||||
self.vals[i].1 = *ts2;
|
self.vals[i].1 = *ts2;
|
||||||
self.vals[i].2 = v2.clone();
|
self.vals[i].2 = v2.clone();
|
||||||
} else if ts1 == ts2 {
|
} else if ts1 == ts2 {
|
||||||
self.vals[i].2.merge(&v2);
|
self.vals[i].2.merge(v2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(i) => {
|
Err(i) => {
|
||||||
|
@ -49,7 +49,7 @@ where
|
|||||||
|
|
||||||
/// Get a reference to the value assigned to a key
|
/// Get a reference to the value assigned to a key
|
||||||
pub fn get(&self, k: &K) -> Option<&V> {
|
pub fn get(&self, k: &K) -> Option<&V> {
|
||||||
match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
|
match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) {
|
||||||
Ok(i) => Some(&self.vals[i].1),
|
Ok(i) => Some(&self.vals[i].1),
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
@ -76,9 +76,9 @@ where
|
|||||||
{
|
{
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
for (k, v2) in other.vals.iter() {
|
for (k, v2) in other.vals.iter() {
|
||||||
match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
|
match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) {
|
||||||
Ok(i) => {
|
Ok(i) => {
|
||||||
self.vals[i].1.merge(&v2);
|
self.vals[i].1.merge(v2);
|
||||||
}
|
}
|
||||||
Err(i) => {
|
Err(i) => {
|
||||||
self.vals.insert(i, (k.clone(), v2.clone()));
|
self.vals.insert(i, (k.clone(), v2.clone()));
|
||||||
|
@ -167,7 +167,7 @@ where
|
|||||||
// Calculate an update to apply to this node
|
// Calculate an update to apply to this node
|
||||||
// This update is an Option<_>, so that it is None if the update is a no-op
|
// This update is an Option<_>, so that it is None if the update is a no-op
|
||||||
// and we can thus skip recalculating and re-storing everything
|
// and we can thus skip recalculating and re-storing everything
|
||||||
let mutate = match self.read_node_txn(tx, &key)? {
|
let mutate = match self.read_node_txn(tx, key)? {
|
||||||
MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)),
|
MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)),
|
||||||
MerkleNode::Intermediate(mut children) => {
|
MerkleNode::Intermediate(mut children) => {
|
||||||
let key2 = key.next_key(khash);
|
let key2 = key.next_key(khash);
|
||||||
@ -270,7 +270,7 @@ where
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(new_node) = mutate {
|
if let Some(new_node) = mutate {
|
||||||
let hash = self.put_node_txn(tx, &key, &new_node)?;
|
let hash = self.put_node_txn(tx, key, &new_node)?;
|
||||||
Ok(Some(hash))
|
Ok(Some(hash))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
@ -27,7 +27,7 @@ pub struct TableShardedReplication {
|
|||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let ring = self.system.ring.borrow();
|
let ring = self.system.ring.borrow();
|
||||||
ring.get_nodes(&hash, self.replication_factor)
|
ring.get_nodes(hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
self.read_quorum
|
self.read_quorum
|
||||||
@ -35,7 +35,7 @@ impl TableReplication for TableShardedReplication {
|
|||||||
|
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let ring = self.system.ring.borrow();
|
let ring = self.system.ring.borrow();
|
||||||
ring.get_nodes(&hash, self.replication_factor)
|
ring.get_nodes(hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.write_quorum
|
||||||
|
@ -266,7 +266,7 @@ where
|
|||||||
let nodes = self
|
let nodes = self
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&begin)
|
.write_nodes(begin)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
if nodes.contains(&self.system.id) {
|
if nodes.contains(&self.system.id) {
|
||||||
@ -530,7 +530,7 @@ where
|
|||||||
Ok(SyncRpc::RootCkDifferent(hash != *h))
|
Ok(SyncRpc::RootCkDifferent(hash != *h))
|
||||||
}
|
}
|
||||||
SyncRpc::GetNode(k) => {
|
SyncRpc::GetNode(k) => {
|
||||||
let node = self.merkle.read_node(&k)?;
|
let node = self.merkle.read_node(k)?;
|
||||||
Ok(SyncRpc::Node(k.clone(), node))
|
Ok(SyncRpc::Node(k.clone(), node))
|
||||||
}
|
}
|
||||||
SyncRpc::Items(items) => {
|
SyncRpc::Items(items) => {
|
||||||
|
@ -36,7 +36,7 @@ pub async fn run_web_server(
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let server = Server::bind(&addr).serve(service);
|
let server = Server::bind(addr).serve(service);
|
||||||
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
let graceful = server.with_graceful_shutdown(shutdown_signal);
|
||||||
info!("Web server listening on http://{}", addr);
|
info!("Web server listening on http://{}", addr);
|
||||||
|
|
||||||
@ -95,12 +95,12 @@ async fn serve_file(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<
|
|||||||
// Get path
|
// Get path
|
||||||
let path = req.uri().path().to_string();
|
let path = req.uri().path().to_string();
|
||||||
let index = &garage.config.s3_web.index;
|
let index = &garage.config.s3_web.index;
|
||||||
let key = path_to_key(&path, &index)?;
|
let key = path_to_key(&path, index)?;
|
||||||
|
|
||||||
info!("Selected bucket: \"{}\", selected key: \"{}\"", bucket, key);
|
info!("Selected bucket: \"{}\", selected key: \"{}\"", bucket, key);
|
||||||
|
|
||||||
let res = match *req.method() {
|
let res = match *req.method() {
|
||||||
Method::HEAD => handle_head(garage, &req, &bucket, &key).await?,
|
Method::HEAD => handle_head(garage, &req, bucket, &key).await?,
|
||||||
Method::GET => handle_get(garage, &req, bucket, &key).await?,
|
Method::GET => handle_get(garage, &req, bucket, &key).await?,
|
||||||
_ => return Err(Error::BadRequest("HTTP method not supported".to_string())),
|
_ => return Err(Error::BadRequest("HTTP method not supported".to_string())),
|
||||||
};
|
};
|
||||||
@ -173,7 +173,7 @@ fn host_to_bucket<'a>(host: &'a str, root: &str) -> &'a str {
|
|||||||
/// When a path ends with "/", we append the index name to match traditional web server behavior
|
/// When a path ends with "/", we append the index name to match traditional web server behavior
|
||||||
/// which is also AWS S3 behavior.
|
/// which is also AWS S3 behavior.
|
||||||
fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
|
fn path_to_key<'a>(path: &'a str, index: &str) -> Result<Cow<'a, str>, Error> {
|
||||||
let path_utf8 = percent_encoding::percent_decode_str(&path).decode_utf8()?;
|
let path_utf8 = percent_encoding::percent_decode_str(path).decode_utf8()?;
|
||||||
|
|
||||||
if !path_utf8.starts_with('/') {
|
if !path_utf8.starts_with('/') {
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
|
Loading…
Reference in New Issue
Block a user