Small refactorings
This commit is contained in:
parent
9f8b3b5a18
commit
2bea76ce16
@ -221,7 +221,7 @@ async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(),
|
|||||||
&who[..],
|
&who[..],
|
||||||
&Message::PutBlock(PutBlockMessage { hash, data }),
|
&Message::PutBlock(PutBlockMessage { hash, data }),
|
||||||
(garage.system.config.data_replication_factor + 1) / 2,
|
(garage.system.config.data_replication_factor + 1) / 2,
|
||||||
DEFAULT_TIMEOUT,
|
BLOCK_RW_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -368,7 +368,7 @@ async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
|
|||||||
&who[..],
|
&who[..],
|
||||||
&Message::GetBlock(hash.clone()),
|
&Message::GetBlock(hash.clone()),
|
||||||
1,
|
1,
|
||||||
DEFAULT_TIMEOUT,
|
BLOCK_RW_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -40,7 +40,7 @@ pub struct BlockRefTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TableFormat for BlockRefTable {
|
impl TableSchema for BlockRefTable {
|
||||||
type P = Hash;
|
type P = Hash;
|
||||||
type S = UUID;
|
type S = UUID;
|
||||||
type E = BlockRef;
|
type E = BlockRef;
|
||||||
|
@ -92,7 +92,7 @@ pub struct ObjectTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TableFormat for ObjectTable {
|
impl TableSchema for ObjectTable {
|
||||||
type P = String;
|
type P = String;
|
||||||
type S = String;
|
type S = String;
|
||||||
type E = Object;
|
type E = Object;
|
||||||
|
@ -4,7 +4,8 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
|
||||||
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
pub const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum Message {
|
pub enum Message {
|
||||||
|
@ -84,7 +84,7 @@ impl Garage {
|
|||||||
timeout: DEFAULT_TIMEOUT,
|
timeout: DEFAULT_TIMEOUT,
|
||||||
};
|
};
|
||||||
|
|
||||||
let block_ref_table = Arc::new(Table::new(
|
let block_ref_table = Table::new(
|
||||||
BlockRefTable {
|
BlockRefTable {
|
||||||
background: background.clone(),
|
background: background.clone(),
|
||||||
block_manager: block_manager.clone(),
|
block_manager: block_manager.clone(),
|
||||||
@ -93,8 +93,8 @@ impl Garage {
|
|||||||
&db,
|
&db,
|
||||||
"block_ref".to_string(),
|
"block_ref".to_string(),
|
||||||
data_rep_param.clone(),
|
data_rep_param.clone(),
|
||||||
));
|
);
|
||||||
let version_table = Arc::new(Table::new(
|
let version_table = Table::new(
|
||||||
VersionTable {
|
VersionTable {
|
||||||
background: background.clone(),
|
background: background.clone(),
|
||||||
block_ref_table: block_ref_table.clone(),
|
block_ref_table: block_ref_table.clone(),
|
||||||
@ -103,8 +103,8 @@ impl Garage {
|
|||||||
&db,
|
&db,
|
||||||
"version".to_string(),
|
"version".to_string(),
|
||||||
meta_rep_param.clone(),
|
meta_rep_param.clone(),
|
||||||
));
|
);
|
||||||
let object_table = Arc::new(Table::new(
|
let object_table = Table::new(
|
||||||
ObjectTable {
|
ObjectTable {
|
||||||
background: background.clone(),
|
background: background.clone(),
|
||||||
version_table: version_table.clone(),
|
version_table: version_table.clone(),
|
||||||
@ -113,7 +113,7 @@ impl Garage {
|
|||||||
&db,
|
&db,
|
||||||
"object".to_string(),
|
"object".to_string(),
|
||||||
meta_rep_param.clone(),
|
meta_rep_param.clone(),
|
||||||
));
|
);
|
||||||
|
|
||||||
let mut garage = Self {
|
let mut garage = Self {
|
||||||
db,
|
db,
|
||||||
|
18
src/table.rs
18
src/table.rs
@ -13,7 +13,7 @@ use crate::membership::System;
|
|||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::rpc_client::*;
|
use crate::rpc_client::*;
|
||||||
|
|
||||||
pub struct Table<F: TableFormat> {
|
pub struct Table<F: TableSchema> {
|
||||||
pub instance: F,
|
pub instance: F,
|
||||||
|
|
||||||
pub name: String,
|
pub name: String,
|
||||||
@ -38,12 +38,12 @@ pub trait TableRpcHandler {
|
|||||||
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error>;
|
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TableRpcHandlerAdapter<F: TableFormat> {
|
struct TableRpcHandlerAdapter<F: TableSchema> {
|
||||||
table: Arc<Table<F>>,
|
table: Arc<Table<F>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
|
impl<F: TableSchema + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
|
||||||
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
|
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
|
||||||
let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
|
let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
|
||||||
let rep = self.table.handle(msg).await?;
|
let rep = self.table.handle(msg).await?;
|
||||||
@ -52,7 +52,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
pub enum TableRPC<F: TableFormat> {
|
pub enum TableRPC<F: TableSchema> {
|
||||||
Ok,
|
Ok,
|
||||||
|
|
||||||
ReadEntry(F::P, F::S),
|
ReadEntry(F::P, F::S),
|
||||||
@ -115,7 +115,7 @@ impl SortKey for Hash {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait TableFormat: Send + Sync {
|
pub trait TableSchema: Send + Sync {
|
||||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
type E: Entry<Self::P, Self::S>;
|
type E: Entry<Self::P, Self::S>;
|
||||||
@ -123,23 +123,23 @@ pub trait TableFormat: Send + Sync {
|
|||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E);
|
async fn updated(&self, old: Option<Self::E>, new: Self::E);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: TableFormat + 'static> Table<F> {
|
impl<F: TableSchema + 'static> Table<F> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
instance: F,
|
instance: F,
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
db: &sled::Db,
|
db: &sled::Db,
|
||||||
name: String,
|
name: String,
|
||||||
param: TableReplicationParams,
|
param: TableReplicationParams,
|
||||||
) -> Self {
|
) -> Arc<Self> {
|
||||||
let store = db.open_tree(&name).expect("Unable to open DB tree");
|
let store = db.open_tree(&name).expect("Unable to open DB tree");
|
||||||
Self {
|
Arc::new(Self {
|
||||||
instance,
|
instance,
|
||||||
name,
|
name,
|
||||||
system,
|
system,
|
||||||
store,
|
store,
|
||||||
partitions: Vec::new(),
|
partitions: Vec::new(),
|
||||||
param,
|
param,
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
|
pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
|
||||||
|
@ -58,7 +58,7 @@ pub struct VersionTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TableFormat for VersionTable {
|
impl TableSchema for VersionTable {
|
||||||
type P = Hash;
|
type P = Hash;
|
||||||
type S = EmptySortKey;
|
type S = EmptySortKey;
|
||||||
type E = Version;
|
type E = Version;
|
||||||
|
Loading…
Reference in New Issue
Block a user