attempt at documenting model crate

This commit is contained in:
Trinity Pointard 2021-03-26 21:53:28 +01:00
parent 30bec0758b
commit 92d54770bb
8 changed files with 119 additions and 15 deletions

View File

@ -24,6 +24,7 @@ use crate::block_ref_table::*;
use crate::garage::Garage; use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_WORKERS: u64 = 1;
@ -33,28 +34,41 @@ const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Message { pub enum Message {
Ok, Ok,
/// Message to ask for a block of data, by hash
GetBlock(Hash), GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage), PutBlock(PutBlockMessage),
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash), NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool), NeedBlockReply(bool),
} }
/// Structure used to send a block
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage { pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Content of the block
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub data: Vec<u8>, pub data: Vec<u8>,
} }
impl RpcMessage for Message {} impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager { pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
pub replication: TableShardedReplication, pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// Lock to prevent concurrent edition of the directory
pub data_dir_lock: Mutex<()>, pub data_dir_lock: Mutex<()>,
rc: sled::Tree, rc: sled::Tree,
@ -128,7 +142,8 @@ impl BlockManager {
} }
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
for i in 0..BACKGROUND_WORKERS { for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone(); let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
@ -141,6 +156,7 @@ impl BlockManager {
} }
} }
/// Write a block to disk
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
let _lock = self.data_dir_lock.lock().await; let _lock = self.data_dir_lock.lock().await;
@ -159,6 +175,7 @@ impl BlockManager {
Ok(Message::Ok) Ok(Message::Ok)
} }
/// Read block from disk, verifying it's integrity
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
let path = self.block_path(hash); let path = self.block_path(hash);
@ -190,6 +207,7 @@ impl BlockManager {
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
} }
/// Check if this node should have a block, but don't actually have it
pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let needed = self let needed = self
.rc .rc
@ -217,6 +235,8 @@ impl BlockManager {
path path
} }
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
let old_rc = self.rc.fetch_and_update(&hash, |old| { let old_rc = self.rc.fetch_and_update(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -229,6 +249,8 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Decrement the number of time a block is used
// when counter reach 0, it seems not put to resync which I assume put it to gc?
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
let new_rc = self.rc.update_and_fetch(&hash, |old| { let new_rc = self.rc.update_and_fetch(&hash, |old| {
let old_v = old.map(u64_from_be_bytes).unwrap_or(0); let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
@ -388,6 +410,7 @@ impl BlockManager {
Ok(()) Ok(())
} }
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash); let who = self.replication.read_nodes(&hash);
let resps = self let resps = self
@ -412,6 +435,7 @@ impl BlockManager {
))) )))
} }
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
self.rpc_client self.rpc_client
@ -498,6 +522,7 @@ impl BlockManager {
.boxed() .boxed()
} }
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> usize { pub fn resync_queue_len(&self) -> usize {
self.resync_queue.len() self.resync_queue.len()
} }

View File

@ -11,12 +11,15 @@ use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
// Primary key // Primary key
/// Hash of the block
pub block: Hash, pub block: Hash,
// Sort key // Sort key
// why a version on a hashed (probably immutable) piece of data?
pub version: UUID, pub version: UUID,
// Keep track of deleted status // Keep track of deleted status
/// Is that block deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
} }

View File

@ -12,15 +12,18 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
// Primary key /// Name of the bucket
pub name: String, pub name: String,
/// State, and configuration if not deleted, of the bucket
pub state: crdt::LWW<BucketState>, pub state: crdt::LWW<BucketState>,
} }
/// State of a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum BucketState { pub enum BucketState {
/// The bucket is deleted
Deleted, Deleted,
/// The bucket exists
Present(BucketParams), Present(BucketParams),
} }
@ -37,9 +40,12 @@ impl CRDT for BucketState {
} }
} }
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams { pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
pub authorized_keys: crdt::LWWMap<String, PermissionSet>, pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
/// Is the bucket served as http
pub website: crdt::LWW<bool>, pub website: crdt::LWW<bool>,
} }
@ -51,6 +57,7 @@ impl CRDT for BucketParams {
} }
impl BucketParams { impl BucketParams {
/// Create a new default `BucketParams`
pub fn new() -> Self { pub fn new() -> Self {
BucketParams { BucketParams {
authorized_keys: crdt::LWWMap::new(), authorized_keys: crdt::LWWMap::new(),
@ -60,15 +67,21 @@ impl BucketParams {
} }
impl Bucket { impl Bucket {
/// Create a new bucket
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
Bucket { Bucket {
name, name,
state: crdt::LWW::new(BucketState::Present(BucketParams::new())), state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
} }
} }
/// Query if bucket is deleted
pub fn is_deleted(&self) -> bool { pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted *self.state.get() == BucketState::Deleted
} }
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() { match self.state.get() {
BucketState::Deleted => &[], BucketState::Deleted => &[],

View File

@ -18,15 +18,23 @@ use crate::key_table::*;
use crate::object_table::*; use crate::object_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An entire Garage full of data
pub struct Garage { pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The local database
pub db: sled::Db, pub db: sled::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>, pub system: Arc<System>,
/// The block manager
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>, pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
@ -35,6 +43,7 @@ pub struct Garage {
} }
impl Garage { impl Garage {
/// Create and run garage
pub fn new( pub fn new(
config: Config, config: Config,
db: sled::Db, db: sled::Db,

View File

@ -3,26 +3,28 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Key { pub struct Key {
// Primary key /// The id of the key (immutable)
pub key_id: String, pub key_id: String,
// Associated secret key (immutable) /// The secret_key associated
// shouldn't it be hashed or something, so it's trully secret?
pub secret_key: String, pub secret_key: String,
// Name /// Name for the key
pub name: crdt::LWW<String>, pub name: crdt::LWW<String>,
// Deletion /// Is the key deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
// Authorized keys /// Buckets in which the key is authorized. Empty if `Key` is deleted
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
// CRDT interaction: deleted implies authorized_buckets is empty
} }
impl Key { impl Key {
/// Create a new key
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
@ -34,6 +36,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Import a key from it's parts
pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self { pub fn import(key_id: &str, secret_key: &str, name: &str) -> Self {
Self { Self {
key_id: key_id.to_string(), key_id: key_id.to_string(),
@ -43,6 +47,8 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Create a new Key which can me merged to mark an existing key deleted
pub fn delete(key_id: String) -> Self { pub fn delete(key_id: String) -> Self {
Self { Self {
key_id, key_id,
@ -52,13 +58,16 @@ impl Key {
authorized_buckets: crdt::LWWMap::new(), authorized_buckets: crdt::LWWMap::new(),
} }
} }
/// Add an authorized bucket, only if it wasn't there before
/// Check if `Key` is allowed to read in bucket
pub fn allow_read(&self, bucket: &str) -> bool { pub fn allow_read(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
.map(|x| x.allow_read) .map(|x| x.allow_read)
.unwrap_or(false) .unwrap_or(false)
} }
/// Check if `Key` is allowed to write in bucket
pub fn allow_write(&self, bucket: &str) -> bool { pub fn allow_write(&self, bucket: &str) -> bool {
self.authorized_buckets self.authorized_buckets
.get(&bucket.to_string()) .get(&bucket.to_string())
@ -67,9 +76,12 @@ impl Key {
} }
} }
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet { pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool, pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool, pub allow_write: bool,
} }

View File

@ -1,3 +1,4 @@
#![warn(missing_docs)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -11,19 +11,21 @@ use garage_table::*;
use crate::version_table::*; use crate::version_table::*;
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
// Primary key /// The bucket in which the object is stored
pub bucket: String, pub bucket: String,
// Sort key /// The key at which the object is stored in its bucket
pub key: String, pub key: String,
// Data /// The list of known versions of the object
versions: Vec<ObjectVersion>, versions: Vec<ObjectVersion>,
} }
impl Object { impl Object {
/// Create an object from parts
pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self { let mut ret = Self {
bucket, bucket,
@ -36,6 +38,7 @@ impl Object {
} }
ret ret
} }
/// Adds a version if it wasn't already present /// Adds a version if it wasn't already present
pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
match self match self
@ -49,23 +52,32 @@ impl Object {
Ok(_) => Err(()), Ok(_) => Err(()),
} }
} }
/// Get a list of all versions known for `Object`
pub fn versions(&self) -> &[ObjectVersion] { pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..] &self.versions[..]
} }
} }
/// Informations about a version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion { pub struct ObjectVersion {
/// Id of the version
pub uuid: UUID, pub uuid: UUID,
/// Timestamp of when the object was created
pub timestamp: u64, pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState, pub state: ObjectVersionState,
} }
/// State of an object version
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState { pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders), Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData), Complete(ObjectVersionData),
/// The version was never fully received
Aborted, Aborted,
} }
@ -90,10 +102,15 @@ impl CRDT for ObjectVersionState {
} }
} }
/// Data about an object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData { pub enum ObjectVersionData {
/// The version is deleted
DeleteMarker, DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash), FirstBlock(ObjectVersionMeta, Hash),
} }
@ -101,16 +118,23 @@ impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta { pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders, pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64, pub size: u64,
/// etag of the object
pub etag: String, pub etag: String,
} }
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders { pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String, pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>, pub other: BTreeMap<String, String>,
} }
@ -118,18 +142,24 @@ impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) { fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid) (self.timestamp, self.uuid)
} }
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool { pub fn is_uploading(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Uploading(_) => true, ObjectVersionState::Uploading(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version completely received
pub fn is_complete(&self) -> bool { pub fn is_complete(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(_) => true, ObjectVersionState::Complete(_) => true,
_ => false, _ => false,
} }
} }
/// Is the object version available (received and not deleted)
pub fn is_data(&self) -> bool { pub fn is_data(&self) -> bool {
match self.state { match self.state {
ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false,

View File

@ -10,21 +10,27 @@ use garage_table::*;
use crate::block_ref_table::*; use crate::block_ref_table::*;
/// A version of an object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version { pub struct Version {
// Primary key /// UUID of the version
pub uuid: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags // In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload // of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>, pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can figure if
// this was deleted later on // this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String, pub bucket: String,
/// Key in which the related object is stored
pub key: String, pub key: String,
} }
@ -43,7 +49,9 @@ impl Version {
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey { pub struct VersionBlockKey {
/// Number of the part, starting at 1
pub part_number: u64, pub part_number: u64,
/// offset of the block in the file, starting at 0
pub offset: u64, pub offset: u64,
} }
@ -61,9 +69,12 @@ impl PartialOrd for VersionBlockKey {
} }
} }
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock { pub struct VersionBlock {
/// Hash of the block
pub hash: Hash, pub hash: Hash,
/// Size of the block
pub size: u64, pub size: u64,
} }