Merge pull request 'add doc comments' (#53) from trinity-1686a/garage:doc-comments into main

Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/53
This commit is contained in:
Alex 2021-04-08 15:01:13 +02:00
commit c35c472dc9
38 changed files with 393 additions and 79 deletions

View File

@ -20,6 +20,7 @@ use crate::s3_get::*;
use crate::s3_list::*; use crate::s3_list::*;
use crate::s3_put::*; use crate::s3_put::*;
/// Run the S3 API server
pub async fn run_api_server( pub async fn run_api_server(
garage: Arc<Garage>, garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,

View File

@ -1,9 +1,13 @@
//! Module containing various helpers for encoding
/// Escape &str for xml inclusion
pub fn xml_escape(s: &str) -> String { pub fn xml_escape(s: &str) -> String {
s.replace("<", "&lt;") s.replace("<", "&lt;")
.replace(">", "&gt;") .replace(">", "&gt;")
.replace("\"", "&quot;") .replace("\"", "&quot;")
} }
/// Encode &str for use in a URI
pub fn uri_encode(string: &str, encode_slash: bool) -> String { pub fn uri_encode(string: &str, encode_slash: bool) -> String {
let mut result = String::with_capacity(string.len() * 2); let mut result = String::with_capacity(string.len() * 2);
for c in string.chars() { for c in string.chars() {
@ -24,6 +28,7 @@ pub fn uri_encode(string: &str, encode_slash: bool) -> String {
result result
} }
/// Encode &str either as an uri, or a valid string for xml inclusion
pub fn xml_encode_key(k: &str, urlencode: bool) -> String { pub fn xml_encode_key(k: &str, urlencode: bool) -> String {
if urlencode { if urlencode {
uri_encode(k, true) uri_encode(k, true)

View File

@ -3,44 +3,57 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
// Category: internal error // Category: internal error
/// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)] #[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError), InternalError(#[error(source)] GarageError),
/// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)] #[error(display = "Internal error (Hyper error): {}", _0)]
Hyper(#[error(source)] hyper::Error), Hyper(#[error(source)] hyper::Error),
/// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)] #[error(display = "Internal error (HTTP error): {}", _0)]
HTTP(#[error(source)] http::Error), HTTP(#[error(source)] http::Error),
// Category: cannot process // Category: cannot process
/// No proper api key was used, or the signature was invalid
#[error(display = "Forbidden: {}", _0)] #[error(display = "Forbidden: {}", _0)]
Forbidden(String), Forbidden(String),
/// The object requested don't exists
#[error(display = "Not found")] #[error(display = "Not found")]
NotFound, NotFound,
// Category: bad request // Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8Str(#[error(source)] std::str::Utf8Error), InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
/// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8String(#[error(source)] std::string::FromUtf8Error), InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
/// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)] #[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError), InvalidBase64(#[error(source)] base64::DecodeError),
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)] #[error(display = "Invalid XML: {}", _0)]
InvalidXML(String), InvalidXML(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)] #[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError), InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)] #[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] http_range::HttpRangeParseError), InvalidRange(#[error(from)] http_range::HttpRangeParseError),
/// The client sent an invalid request
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
} }
@ -52,6 +65,7 @@ impl From<roxmltree::Error> for Error {
} }
impl Error { impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
pub fn http_status_code(&self) -> StatusCode { pub fn http_status_code(&self) -> StatusCode {
match self { match self {
Error::NotFound => StatusCode::NOT_FOUND, Error::NotFound => StatusCode::NOT_FOUND,
@ -65,6 +79,7 @@ impl Error {
} }
} }
/// Trait to map error to the Bad Request error code
pub trait OkOrBadRequest { pub trait OkOrBadRequest {
type S2; type S2;
fn ok_or_bad_request(self, reason: &'static str) -> Self::S2; fn ok_or_bad_request(self, reason: &'static str) -> Self::S2;
@ -93,6 +108,7 @@ impl<T> OkOrBadRequest for Option<T> {
} }
} }
/// Trait to map an error to an Internal Error code
pub trait OkOrInternalError { pub trait OkOrInternalError {
type S2; type S2;
fn ok_or_internal_error(self, reason: &'static str) -> Self::S2; fn ok_or_internal_error(self, reason: &'static str) -> Self::S2;

View File

@ -1,15 +1,19 @@
//! Crate for serving a S3 compatible API
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error; mod error;
pub use error::Error;
pub mod encoding; mod encoding;
pub mod api_server; mod api_server;
pub mod signature; pub use api_server::run_api_server;
pub mod s3_copy; mod signature;
pub mod s3_delete;
mod s3_copy;
mod s3_delete;
pub mod s3_get; pub mod s3_get;
pub mod s3_list; mod s3_list;
pub mod s3_put; mod s3_put;

View File

@ -1,3 +1,4 @@
//! Function related to GET and HEAD requests
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
@ -79,6 +80,7 @@ fn try_answer_cached(
} }
} }
/// Handle HEAD request
pub async fn handle_head( pub async fn handle_head(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,
@ -118,6 +120,7 @@ pub async fn handle_head(
Ok(response) Ok(response)
} }
/// Handle GET request
pub async fn handle_get( pub async fn handle_get(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<Body>,
@ -224,7 +227,7 @@ pub async fn handle_get(
} }
} }
pub async fn handle_get_range( async fn handle_get_range(
garage: Arc<Garage>, garage: Arc<Garage>,
version: &ObjectVersion, version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,

View File

@ -80,7 +80,7 @@ pub struct ConfigureNodeOpt {
#[structopt(short = "c", long = "capacity")] #[structopt(short = "c", long = "capacity")]
capacity: Option<u32>, capacity: Option<u32>,
/// Optionnal node tag /// Optional node tag
#[structopt(short = "t", long = "tag")] #[structopt(short = "t", long = "tag")]
tag: Option<String>, tag: Option<String>,

View File

@ -1,4 +1,5 @@
#![recursion_limit = "1024"] #![recursion_limit = "1024"]
//! Garage CLI, used to interact with a running Garage instance, and to launch a Garage instance
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -25,7 +26,7 @@ use cli::*;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "garage")] #[structopt(name = "garage")]
pub struct Opt { struct Opt {
/// RPC connect to this host to execute client operations /// RPC connect to this host to execute client operations
#[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")]
pub rpc_host: SocketAddr, pub rpc_host: SocketAddr,

View File

@ -8,10 +8,10 @@ use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_api::api_server; use garage_api::run_api_server;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::rpc_server::RpcServer; use garage_rpc::rpc_server::RpcServer;
use garage_web::web_server; use garage_web::run_web_server;
use crate::admin_rpc::*; use crate::admin_rpc::*;
@ -62,8 +62,8 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing RPC and API servers..."); info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); let api_server = run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); let web_server = run_web_server(garage, wait_from(watch_cancel.clone()));
futures::try_join!( futures::try_join!(
bootstrap.map(|rv| { bootstrap.map(|rv| {

View File

@ -18,12 +18,13 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use garage_table::replication::{sharded::TableShardedReplication, TableReplication}; use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block_ref_table::*; use crate::block_ref_table::*;
use crate::garage::Garage; use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
Ok, Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash), GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage), PutBlock(PutBlockMessage),
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash), NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool), NeedBlockReply(bool),
} }
/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage { pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Content of the block
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub data: Vec<u8>, pub data: Vec<u8>,
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager { pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication, pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>, pub data_dir_lock: Mutex<()>,
rc: sled::Tree, rc: sled::Tree,
@ -128,7 +142,8 @@ impl BlockManager {
} }
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS { for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone(); let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
@ -141,7 +156,8 @@ impl BlockManager {
} }
} }
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await; let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash); let mut path = self.block_dir(hash);
@ -159,7 +175,8 @@ impl BlockManager {
Ok(Message::Ok) Ok(Message::Ok)
} }
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { /// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash); let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await { let mut f = match fs::File::open(&path).await {
@ -190,7 +207,8 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
} }
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { /// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self let needed = self
.rc .rc
.get(hash.as_ref())? .get(hash.as_ref())?
@ -217,6 +235,8 @@ impl BlockManager {
path path
} }
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +249,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Decrement the number of time a block is used
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| { let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -388,6 +409,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash); let who = self.replication.read_nodes(&hash);
let resps = self let resps = self
@ -412,6 +434,7 @@ impl BlockManager {
))) )))
} }
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
self.rpc_client self.rpc_client
@ -498,6 +521,7 @@ impl BlockManager {
.boxed() .boxed()
} }
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len() self.resync_queue.len()
} }

View File

@ -10,13 +10,14 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
// Primary key /// Hash of the block, used as partition key
pub block: Hash, pub block: Hash,
// Sort key /// Id of the Version for the object containing this block, used as sorting key
pub version: UUID, pub version: UUID,
// Keep track of deleted status // Keep track of deleted status
/// Is the Version that contains this block deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
} }

View File

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
// Primary key /// Name of the bucket
pub name: String, pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>, pub state: crdt::LWW<BucketState>,
} }
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState { pub enum BucketState {
/// The bucket is deleted
Deleted, Deleted,
/// The bucket exists
Present(BucketParams), Present(BucketParams),
} }
@ -37,9 +40,12 @@ impl CRDT for BucketState {
} }
} }
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams { pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>, pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>, pub website: crdt::LWW<bool>,
} }
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
} }
impl BucketParams { impl BucketParams {
/// Initializes a new instance of the Bucket struct
pub fn new() -> Self { pub fn new() -> Self {
BucketParams { BucketParams {
authorized_keys: crdt::LWWMap::new(), authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
} }
impl Bucket { impl Bucket {
/// Create a new bucket
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
Bucket { Bucket {
name, name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())), state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
} }
} }
/// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool { pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted *self.state.get() == BucketState::Deleted
} }
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() { match self.state.get() {
BucketState::Deleted => &[], BucketState::Deleted => &[],

View File

@ -7,8 +7,8 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_client::RpcHttpClient;
use garage_rpc::rpc_server::RpcServer; use garage_rpc::rpc_server::RpcServer;
use garage_table::replication::fullcopy::*; use garage_table::replication::TableFullReplication;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block::*; use crate::block::*;
@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*; use crate::object_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage { pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The local database
pub db: sled::Db, pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>, pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>, pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
} }
impl Garage { impl Garage {
/// Create and run garage
pub fn new( pub fn new(
config: Config, config: Config,
db: sled::Db, db: sled::Db,

View File

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key { pub struct Key {
// Primary key /// The id of the key (immutable), used as partition key
pub key_id: String, pub key_id: String,
// Associated secret key (immutable) /// The secret_key associated
pub secret_key: String, pub secret_key: String,
// Name /// Name for the key
pub name: crdt::LWW<String>, pub name: crdt::LWW<String>,
// Deletion /// Is the key deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
// Authorized keys /// Buckets in which the key is authorized. Empty if `Key` is deleted
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
// CRDT interaction: deleted implies authorized_buckets is empty // CRDT interaction: deleted implies authorized_buckets is empty
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
} }
impl Key { impl Key {
/// Create a new key
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self { Self {
key_id: key_id.to_string(), key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self { pub fn delete(key_id: String) -> Self {
Self { Self {
key_id, key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool { pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
.map(|x| x.allow_read) .map(|x| x.allow_read)
.unwrap_or(false) .unwrap_or(false)
} }
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool { pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
} }
} }
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet { pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool, pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool, pub allow_write: bool,
} }

View File

@ -6,24 +6,26 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
// Primary key /// The bucket in which the object is stored, used as partition key
pub bucket: String, pub bucket: String,
// Sort key /// The key at which the object is stored in its bucket, used as sorting key
pub key: String, pub key: String,
// Data /// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>, versions: Vec<ObjectVersion>,
} }
impl Object { impl Object {
/// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self { let mut ret = Self {
bucket, bucket,
@ -36,6 +38,7 @@ impl Object {
} }
ret ret
} }
/// Adds a version if it wasn't already present /// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()), Ok(_) => Err(()),
} }
} }
/// Get a list of currently stored versions of `Object`
pub fn versions(&self) -> &[ObjectVersion] { pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..] &self.versions[..]
} }
} }
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion { pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID, pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64, pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState, pub state: ObjectVersionState,
} }
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState { pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders), Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData), Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted, Aborted,
} }
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
} }
} }
/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData { pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker, DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash), FirstBlock(ObjectVersionMeta, Hash),
} }
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta { pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders, pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64, pub size: u64,
/// etag of the object
pub etag: String, pub etag: String,
} }
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders { pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String, pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>, pub other: BTreeMap<String, String>,
} }
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) { fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid) (self.timestamp, self.uuid)
} }
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool { pub fn is_uploading(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Uploading(_) => true, ObjectVersionState::Uploading(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version completely received
pub fn is_complete(&self) -> bool { pub fn is_complete(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(_) => true, ObjectVersionState::Complete(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version available (received and not a tombstone)
pub fn is_data(&self) -> bool { pub fn is_data(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,

View File

@ -5,26 +5,32 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::replication::sharded::*; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
// Primary key /// UUID of the version, used as partition key
pub uuid: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags // In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload // of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>, pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can figure if
// this was deleted later on // this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String, pub bucket: String,
/// Key in which the related object is stored
pub key: String, pub key: String,
} }
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey { pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64, pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64, pub offset: u64,
} }
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
} }
} }
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock { pub struct VersionBlock {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Size of the block
pub size: u64, pub size: u64,
} }

View File

@ -1,7 +1,9 @@
//! Crate containing rpc related functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod consul; mod consul;
pub(crate) mod tls_util; pub(crate) mod tls_util;
pub mod membership; pub mod membership;

View File

@ -1,3 +1,4 @@
//! Module containing structs related to membership management
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::io::{Read, Write}; use std::io::{Read, Write};
@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
/// Response to successfull advertisements
Ok, Ok,
/// Message sent to detect other nodes status
Ping(PingMessage), Ping(PingMessage),
/// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus, PullStatus,
/// Ask other node its config. Answered with AdvertiseConfig
PullConfig, PullConfig,
/// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>), AdvertiseNodesUp(Vec<AdvertisedNode>),
/// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig), AdvertiseConfig(NetworkConfig),
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage { pub struct PingMessage {
id: UUID, id: UUID,
@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo, state_info: StateInfo,
} }
/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode { pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
pub id: UUID, pub id: UUID,
/// IP and port of the node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Is the node considered up
pub is_up: bool, pub is_up: bool,
/// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64, pub last_seen: u64,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
/// This node's membership manager
pub struct System { pub struct System {
/// The id of this node
pub id: UUID, pub id: UUID,
persist_config: Persister<NetworkConfig>, persist_config: Persister<NetworkConfig>,
@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>, rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>, pub(crate) status: watch::Receiver<Arc<Status>>,
/// The ring
pub ring: watch::Receiver<Arc<Ring>>, pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>, update_lock: Mutex<Updaters>,
/// The job runner of this node
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
} }
@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>, update_ring: watch::Sender<Arc<Ring>>,
} }
/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
/// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub nodes: HashMap<UUID, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash, pub hash: Hash,
} }
/// The status of a single node
#[derive(Debug)] #[derive(Debug)]
pub struct StatusEntry { pub struct StatusEntry {
/// The IP and port used to connect to this node
pub addr: SocketAddr, pub addr: SocketAddr,
/// Last time this node was seen
pub last_seen: u64, pub last_seen: u64,
/// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize, pub num_failures: AtomicUsize,
pub state_info: StateInfo, pub state_info: StateInfo,
} }
impl StatusEntry { impl StatusEntry {
/// is the node associated to this entry considered up
pub fn is_up(&self) -> bool { pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
} }
@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
} }
impl System { impl System {
/// Create this node's membership manager
pub fn new( pub fn new(
metadata_dir: PathBuf, metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>, rpc_http_client: Arc<RpcHttpClient>,
@ -279,6 +307,7 @@ impl System {
}); });
} }
/// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new( RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@ -287,6 +316,7 @@ impl System {
) )
} }
/// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone(); let ring = self.ring.borrow().clone();
self.persist_config self.persist_config
@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await; self.rpc_client.call_many(&to[..], msg, timeout).await;
} }
/// Perform bootstraping, starting the ping loop
pub async fn bootstrap( pub async fn bootstrap(
self: Arc<Self>, self: Arc<Self>,
peers: Vec<SocketAddr>, peers: Vec<SocketAddr>,
@ -386,6 +417,8 @@ impl System {
} }
} else if let Some(id) = id_option { } else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) { if let Some(st) = status.nodes.get_mut(id) {
// we need to increment failure counter as call was done using by_addr so the
// counter was not auto-incremented
st.num_failures.fetch_add(1, Ordering::SeqCst); st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() { if !st.is_up() {
warn!("Node {:?} seems to be down.", id); warn!("Node {:?} seems to be down.", id);

View File

@ -1,3 +1,5 @@
//! Module containing types related to computing nodes which should receive a copy of data blocks
//! and metadata
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits, // A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions. // i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions) // (in practice we have exactly 2**PARTITION_BITS partitions)
/// A partition id, stored on 16 bits
pub type Partition = u16; pub type Partition = u16;
// TODO: make this constant parametrizable in the config file // TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump // For deployments with many nodes it might make sense to bump
// it up to 10. // it up to 10.
// Maximum value : 16 // Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8; pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file // TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...) // (most deployments use a replication factor of 3, so...)
/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3; pub const MAX_REPLICATION: usize = 3;
/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig { pub struct NetworkConfig {
/// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>, pub members: HashMap<UUID, NetworkConfigEntry>,
/// Version of this config
pub version: u64, pub version: u64,
} }
@ -37,26 +46,40 @@ impl NetworkConfig {
} }
} }
/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry { pub struct NetworkConfigEntry {
/// Datacenter at which this entry belong. This infromation might be used to perform a better
/// geodistribution
pub datacenter: String, pub datacenter: String,
/// The (relative) capacity of the node
pub capacity: u32, pub capacity: u32,
/// A tag to recognize the entry, not used for other things than display
pub tag: String, pub tag: String,
} }
/// A ring distributing fairly objects to nodes
#[derive(Clone)] #[derive(Clone)]
pub struct Ring { pub struct Ring {
/// The network configuration used to generate this ring
pub config: NetworkConfig, pub config: NetworkConfig,
/// The list of entries in the ring
pub ring: Vec<RingEntry>, pub ring: Vec<RingEntry>,
} }
/// An entry in the ring
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RingEntry { pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash, pub location: Hash,
/// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION], pub nodes: [UUID; MAX_REPLICATION],
} }
impl Ring { impl Ring {
// TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
// levels of imbrication. It is basically impossible to test, maintain, or understand for an
// outsider.
pub(crate) fn new(config: NetworkConfig) -> Self { pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1) // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@ -166,20 +189,16 @@ impl Ring {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// eprintln!("RING: --");
// for e in ring.iter() {
// eprintln!("{:?}", e);
// }
// eprintln!("END --");
Self { config, ring } Self { config, ring }
} }
/// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition { pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS) top >> (16 - PARTITION_BITS)
} }
/// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> { pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![]; let mut ret = vec![];
@ -193,6 +212,8 @@ impl Ring {
ret ret
} }
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS { if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!"); warn!("Ring not yet ready, read/writes will be lost!");
@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
// TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize); assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx]; let partition = &self.ring[partition_idx];
let partition_top = let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
// TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
// probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len()); assert!(n <= partition.nodes.len());

View File

@ -1,3 +1,4 @@
//! Contain structs related to making RPCs
use std::borrow::Borrow; use std::borrow::Borrow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct RequestStrategy { pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration, pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: usize, pub rs_quorum: usize,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool, pub rs_interrupt_after_quorum: bool,
} }
impl RequestStrategy { impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self { pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy { RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT, rs_timeout: DEFAULT_TIMEOUT,
@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false, rs_interrupt_after_quorum: false,
} }
} }
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self { pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout; self.rs_timeout = timeout;
self self
} }
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt; self.rs_interrupt_after_quorum = interrupt;
self self
} }
} }
/// Shortcut for a boxed async function taking a message, and resolving to another message or an
/// error
pub type LocalHandlerFn<M> = pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> { pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>, status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
} }
impl<M: RpcMessage + 'static> RpcClient<M> { impl<M: RpcMessage + 'static> RpcClient<M> {
/// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new( pub fn new(
rac: RpcAddrClient<M>, rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}) })
} }
/// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler)))); self.local_handler.swap(Some(Arc::new((my_id, handler))));
} }
/// Get a RPC client to make calls using node's SocketAddr instead of its ID
pub fn by_addr(&self) -> &RpcAddrClient<M> { pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client &self.rpc_addr_client
} }
/// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await self.call_arc(to, Arc::new(msg), timeout).await
} }
/// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() { if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref(); let (my_id, local_handler) = lh.as_ref();
@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
/// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg); let msg = Arc::new(msg);
let mut resp_stream = to let mut resp_stream = to
@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results results
} }
/// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
/// strategy could not be respected due to too many errors
pub async fn try_call_many( pub async fn try_call_many(
self: &Arc<Self>, self: &Arc<Self>,
to: &[UUID], to: &[UUID],
@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
} }
} }
/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> { pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>, phantom: PhantomData<M>,
@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
} }
impl<M: RpcMessage> RpcAddrClient<M> { impl<M: RpcMessage> RpcAddrClient<M> {
/// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self { Self {
phantom: PhantomData::default(), phantom: PhantomData::default(),
@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// Make a RPC
pub async fn call<MB>( pub async fn call<MB>(
&self, &self,
to_addr: &SocketAddr, to_addr: &SocketAddr,
@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
} }
} }
/// HTTP client used to make RPCs
pub struct RpcHttpClient { pub struct RpcHttpClient {
request_limiter: Semaphore, request_limiter: Semaphore,
method: ClientMethod, method: ClientMethod,
@ -249,6 +273,7 @@ enum ClientMethod {
} }
impl RpcHttpClient { impl RpcHttpClient {
/// Create a new RpcHttpClient
pub fn new( pub fn new(
max_concurrent_requests: usize, max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>, tls_config: &Option<TlsConfig>,
@ -279,6 +304,7 @@ impl RpcHttpClient {
}) })
} }
/// Make a RPC
async fn call<M, MB>( async fn call<M, MB>(
&self, &self,
path: &str, path: &str,

View File

@ -1,3 +1,4 @@
//! Contains structs related to receiving RPCs
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util; use crate::tls_util;
/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
/// Structure handling RPCs
pub struct RpcServer { pub struct RpcServer {
/// The address the RpcServer will bind
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>, pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>, handlers: HashMap<String, Handler>,
@ -87,6 +92,7 @@ where
} }
impl RpcServer { impl RpcServer {
/// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self { Self {
bind_addr, bind_addr,
@ -95,6 +101,7 @@ impl RpcServer {
} }
} }
/// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where where
M: RpcMessage + 'static, M: RpcMessage + 'static,
@ -156,6 +163,7 @@ impl RpcServer {
} }
} }
/// Run the RpcServer
pub async fn run( pub async fn run(
self: Arc<Self>, self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,

View File

@ -34,7 +34,7 @@ use crate::crdt::crdt::*;
/// and may differ from what you observed with your atomic clock! /// and may differ from what you observed with your atomic clock!
/// ///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing /// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts. /// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> { pub struct LWW<T> {
ts: u64, ts: u64,

View File

@ -37,6 +37,7 @@ where
Self { vals: vec![(k, v)] } Self { vals: vec![(k, v)] }
} }
/// Add a value to the map
pub fn put(&mut self, k: K, v: V) { pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v)); self.merge(&Self::put_mutator(k, v));
} }

View File

@ -74,7 +74,7 @@ where
while !*must_exit.borrow() { while !*must_exit.borrow() {
match self.gc_loop_iter().await { match self.gc_loop_iter().await {
Ok(true) => { Ok(true) => {
// Stuff was done, loop imediately // Stuff was done, loop immediately
continue; continue;
} }
Ok(false) => { Ok(false) => {

View File

@ -8,10 +8,10 @@ pub mod schema;
pub mod util; pub mod util;
pub mod data; pub mod data;
pub mod gc; mod gc;
pub mod merkle; mod merkle;
pub mod replication; pub mod replication;
pub mod sync; mod sync;
pub mod table; pub mod table;
pub use schema::*; pub use schema::*;

View File

@ -6,19 +6,19 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
#[derive(Clone)] #[derive(Clone)]
pub struct TableFullReplication { pub struct TableFullReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// Max number of faults allowed while replicating a record
pub max_faults: usize, pub max_faults: usize,
} }
impl TableReplication for TableFullReplication { impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id] vec![self.system.id]
} }

View File

@ -1,6 +1,8 @@
mod parameters; mod parameters;
pub mod fullcopy; mod fullcopy;
pub mod sharded; mod sharded;
pub use fullcopy::TableFullReplication;
pub use parameters::*; pub use parameters::*;
pub use sharded::TableShardedReplication;

View File

@ -2,20 +2,25 @@ use garage_rpc::ring::*;
use garage_util::data::*; use garage_util::data::*;
/// Trait to describe how a table shall be replicated
pub trait TableReplication: Send + Sync { pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods // To understand various replication methods
// Which nodes to send reads from /// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>; fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize; fn read_quorum(&self) -> usize;
// Which nodes to send writes to /// Which nodes to send writes to
fn write_nodes(&self, hash: &Hash) -> Vec<UUID>; fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize; fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize; fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync // Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition; fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>; fn partitions(&self) -> Vec<(Partition, Hash)>;
} }

View File

@ -6,22 +6,25 @@ use garage_util::data::*;
use crate::replication::*; use crate::replication::*;
/// Sharded replication schema:
/// - based on the ring of nodes, a certain set of neighbors
/// store entries, given as a function of the position of the
/// entry's hash in the ring
/// - reads are done on all of the nodes that replicate the data
/// - writes as well
#[derive(Clone)] #[derive(Clone)]
pub struct TableShardedReplication { pub struct TableShardedReplication {
/// The membership manager of this node
pub system: Arc<System>, pub system: Arc<System>,
/// How many time each data should be replicated
pub replication_factor: usize, pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize, pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize, pub write_quorum: usize,
} }
impl TableReplication for TableShardedReplication { impl TableReplication for TableShardedReplication {
// Sharded replication schema:
// - based on the ring of nodes, a certain set of neighbors
// store entries, given as a function of the position of the
// entry's hash in the ring
// - reads are done on all of the nodes that replicate the data
// - writes as well
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> { fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone(); let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor) ring.walk_ring(&hash, self.replication_factor)

View File

@ -4,7 +4,9 @@ use garage_util::data::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
/// Trait for field used to partition data
pub trait PartitionKey { pub trait PartitionKey {
/// Get the key used to partition
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
@ -20,7 +22,9 @@ impl PartitionKey for Hash {
} }
} }
/// Trait for field used to sort data
pub trait SortKey { pub trait SortKey {
/// Get the key used to sort
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
@ -36,25 +40,34 @@ impl SortKey for Hash {
} }
} }
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
/// Get the key used to partition
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
/// Get the key used to sort
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
/// Is the entry a tombstone? Default implementation always return false
fn is_tombstone(&self) -> bool { fn is_tombstone(&self) -> bool {
false false
} }
} }
/// Trait for the schema used in a table
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
/// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>; type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version: // Action to take if not able to decode current version:
// try loading from an older version // try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None None
} }
@ -65,7 +78,5 @@ pub trait TableSchema: Send + Sync {
// to stderr. // to stderr.
fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool;
true
}
} }

View File

@ -1,3 +1,4 @@
//! Job runner for futures and async functions
use core::future::Future; use core::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
@ -12,14 +13,15 @@ use crate::error::Error;
type JobOutput = Result<(), Error>; type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner { pub struct BackgroundRunner {
pub stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>, queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>, worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
} }
impl BackgroundRunner { impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new( pub fn new(
n_runners: usize, n_runners: usize,
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
@ -103,7 +105,7 @@ impl BackgroundRunner {
(bgrunner, await_all_done) (bgrunner, await_all_done)
} }
// Spawn a task to be run in background /// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T) pub fn spawn<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,
@ -115,6 +117,8 @@ impl BackgroundRunner {
.unwrap(); .unwrap();
} }
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T) pub fn spawn_cancellable<T>(&self, job: T)
where where
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,

View File

@ -1,3 +1,4 @@
//! Contains type and functions related to Garage configuration file
use std::io::Read; use std::io::Read;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
@ -6,57 +7,82 @@ use serde::{de, Deserialize};
use crate::error::Error; use crate::error::Error;
/// Represent the whole configuration
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
/// Path where to store metadata. Should be fast, but low volume
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
/// Path where to store data. Can be slower, but need higher volume
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Address to bind for RPC
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
/// Bootstrap peers RPC address
#[serde(deserialize_with = "deserialize_vec_addr")] #[serde(deserialize_with = "deserialize_vec_addr")]
pub bootstrap_peers: Vec<SocketAddr>, pub bootstrap_peers: Vec<SocketAddr>,
/// Consule host to connect to to discover more peers
pub consul_host: Option<String>, pub consul_host: Option<String>,
/// Consul service name to use
pub consul_service_name: Option<String>, pub consul_service_name: Option<String>,
/// Max number of concurrent RPC request
#[serde(default = "default_max_concurrent_rpc_requests")] #[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize, pub max_concurrent_rpc_requests: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_size")] #[serde(default = "default_block_size")]
pub block_size: usize, pub block_size: usize,
#[serde(default = "default_control_write_max_faults")] #[serde(default = "default_control_write_max_faults")]
pub control_write_max_faults: usize, pub control_write_max_faults: usize,
/// How many nodes should hold a copy of meta data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize, pub meta_replication_factor: usize,
/// How many nodes should hold a copy of data
#[serde(default = "default_replication_factor")] #[serde(default = "default_replication_factor")]
pub data_replication_factor: usize, pub data_replication_factor: usize,
/// Configuration for RPC TLS
pub rpc_tls: Option<TlsConfig>, pub rpc_tls: Option<TlsConfig>,
/// Configuration for S3 api
pub s3_api: ApiConfig, pub s3_api: ApiConfig,
/// Configuration for serving files as normal web server
pub s3_web: WebConfig, pub s3_web: WebConfig,
} }
/// Configuration for RPC TLS
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig { pub struct TlsConfig {
/// Path to certificate autority used for all nodes
pub ca_cert: String, pub ca_cert: String,
/// Path to public certificate for this node
pub node_cert: String, pub node_cert: String,
/// Path to private key for this node
pub node_key: String, pub node_key: String,
} }
/// Configuration for S3 api
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ApiConfig { pub struct ApiConfig {
/// Address and port to bind for api serving
pub api_bind_addr: SocketAddr, pub api_bind_addr: SocketAddr,
/// S3 region to use
pub s3_region: String, pub s3_region: String,
} }
/// Configuration for serving files as normal web server
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct WebConfig { pub struct WebConfig {
/// Address and port to bind for web serving
pub bind_addr: SocketAddr, pub bind_addr: SocketAddr,
/// Suffix to remove from domain name to find bucket
pub root_domain: String, pub root_domain: String,
/// Suffix to add when user-agent request path end with "/"
pub index: String, pub index: String,
} }
@ -73,6 +99,7 @@ fn default_control_write_max_faults() -> usize {
1 1
} }
/// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new() let mut file = std::fs::OpenOptions::new()
.read(true) .read(true)

View File

@ -1,8 +1,10 @@
//! Contains common types and functions related to serialization and integrity
use rand::Rng; use rand::Rng;
use serde::de::{self, Visitor}; use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt; use std::fmt;
/// An array of 32 bytes
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] #[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]); pub struct FixedBytes32([u8; 32]);
@ -61,15 +63,20 @@ impl Serialize for FixedBytes32 {
} }
impl FixedBytes32 { impl FixedBytes32 {
/// Access the content as a slice
pub fn as_slice(&self) -> &[u8] { pub fn as_slice(&self) -> &[u8] {
&self.0[..] &self.0[..]
} }
/// Access the content as a mutable slice
pub fn as_slice_mut(&mut self) -> &mut [u8] { pub fn as_slice_mut(&mut self) -> &mut [u8] {
&mut self.0[..] &mut self.0[..]
} }
/// Copy to a slice
pub fn to_vec(&self) -> Vec<u8> { pub fn to_vec(&self) -> Vec<u8> {
self.0.to_vec() self.0.to_vec()
} }
/// Try building a FixedBytes32 from a slice
/// Return None if the slice is not 32 bytes long
pub fn try_from(by: &[u8]) -> Option<Self> { pub fn try_from(by: &[u8]) -> Option<Self> {
if by.len() != 32 { if by.len() != 32 {
return None; return None;
@ -80,9 +87,12 @@ impl FixedBytes32 {
} }
} }
/// A 32 bytes UUID
pub type UUID = FixedBytes32; pub type UUID = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32; pub type Hash = FixedBytes32;
/// Compute the sha256 of a slice
pub fn sha256sum(data: &[u8]) -> Hash { pub fn sha256sum(data: &[u8]) -> Hash {
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -93,6 +103,7 @@ pub fn sha256sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// Compute the blake2 of a slice
pub fn blake2sum(data: &[u8]) -> Hash { pub fn blake2sum(data: &[u8]) -> Hash {
use blake2::{Blake2b, Digest}; use blake2::{Blake2b, Digest};
@ -103,8 +114,10 @@ pub fn blake2sum(data: &[u8]) -> Hash {
hash.into() hash.into()
} }
/// A 64 bit non cryptographic hash
pub type FastHash = u64; pub type FastHash = u64;
/// Compute a (non cryptographic) of a slice
pub fn fasthash(data: &[u8]) -> FastHash { pub fn fasthash(data: &[u8]) -> FastHash {
use xxhash_rust::xxh3::Xxh3; use xxhash_rust::xxh3::Xxh3;
@ -113,12 +126,14 @@ pub fn fasthash(data: &[u8]) -> FastHash {
h.digest() h.digest()
} }
/// Generate a random 32 bytes UUID
pub fn gen_uuid() -> UUID { pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into() rand::thread_rng().gen::<[u8; 32]>().into()
} }
// RMP serialization with names of fields and variants // RMP serialization with names of fields and variants
/// Serialize to MessagePack
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
where where
T: Serialize + ?Sized, T: Serialize + ?Sized,
@ -131,10 +146,13 @@ where
Ok(wr) Ok(wr)
} }
/// Serialize to JSON, truncating long result
pub fn debug_serialize<T: Serialize>(x: T) -> String { pub fn debug_serialize<T: Serialize>(x: T) -> String {
match serde_json::to_string(&x) { match serde_json::to_string(&x) {
Ok(ss) => { Ok(ss) => {
if ss.len() > 100 { if ss.len() > 100 {
// TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
// (or more) codepoint
ss[..100].to_string() ss[..100].to_string()
} else { } else {
ss ss

View File

@ -1,9 +1,11 @@
//! Module containing error types used in Garage
use err_derive::Error; use err_derive::Error;
use hyper::StatusCode; use hyper::StatusCode;
use std::io; use std::io;
use crate::data::*; use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RPCError { pub enum RPCError {
#[error(display = "Node is down: {:?}.", _0)] #[error(display = "Node is down: {:?}.", _0)]
@ -28,6 +30,7 @@ pub enum RPCError {
TooManyErrors(Vec<String>), TooManyErrors(Vec<String>),
} }
/// Regroup all Garage errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error(display = "IO error: {}", _0)] #[error(display = "IO error: {}", _0)]

View File

@ -1,3 +1,5 @@
//! Crate containing common functions and types used in Garage
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -1,6 +1,8 @@
//! Module containing helper functions to manipulate time
use chrono::{SecondsFormat, TimeZone, Utc}; use chrono::{SecondsFormat, TimeZone, Utc};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
/// Returns milliseconds since UNIX Epoch
pub fn now_msec() -> u64 { pub fn now_msec() -> u64 {
SystemTime::now() SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -8,6 +10,8 @@ pub fn now_msec() -> u64 {
.as_millis() as u64 .as_millis() as u64
} }
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String { pub fn msec_to_rfc3339(msecs: u64) -> String {
let secs = msecs as i64 / 1000; let secs = msecs as i64 / 1000;
let nanos = (msecs as i64 % 1000) as u32 * 1_000_000; let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;

View File

@ -3,30 +3,37 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
/// An error received from the API crate
#[error(display = "API error: {}", _0)] #[error(display = "API error: {}", _0)]
ApiError(#[error(source)] garage_api::error::Error), ApiError(#[error(source)] garage_api::Error),
// Category: internal error // Category: internal error
/// Error internal to garage
#[error(display = "Internal error: {}", _0)] #[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError), InternalError(#[error(source)] GarageError),
/// The file does not exist
#[error(display = "Not found")] #[error(display = "Not found")]
NotFound, NotFound,
// Category: bad request /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUTF8(#[error(source)] std::str::Utf8Error), InvalidUTF8(#[error(source)] std::str::Utf8Error),
/// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)] #[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError), InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a request without host, or with unsupported method
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
} }
impl Error { impl Error {
/// Transform errors into http status code
pub fn http_status_code(&self) -> StatusCode { pub fn http_status_code(&self) -> StatusCode {
match self { match self {
Error::NotFound => StatusCode::NOT_FOUND, Error::NotFound => StatusCode::NOT_FOUND,

View File

@ -1,6 +1,9 @@
//! Crate for handling web serving of s3 bucket
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod error; mod error;
pub use error::Error;
pub mod web_server; mod web_server;
pub use web_server::run_web_server;

View File

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use garage_table::*; use garage_table::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
/// Run a web server
pub async fn run_web_server( pub async fn run_web_server(
garage: Arc<Garage>, garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>, shutdown_signal: impl Future<Output = ()>,