Compare commits
60 Commits
main
...
nlnet-task
Author | SHA1 | Date |
---|---|---|
Alex Auvolat | 0041b013a4 | 5 months ago |
Alex Auvolat | adccce1145 | 5 months ago |
Alex Auvolat | 85b5a6bcd1 | 5 months ago |
Alex Auvolat | e4f493b481 | 5 months ago |
Alex Auvolat | f8df90b79b | 5 months ago |
Alex Auvolat | 4dbf254512 | 5 months ago |
Alex Auvolat | 64a6e557a4 | 5 months ago |
Alex Auvolat | 5dd200c015 | 5 months ago |
Alex Auvolat | 063294dd56 | 5 months ago |
Alex Auvolat | 7f2541101f | 5 months ago |
Alex Auvolat | 91b874c4ef | 5 months ago |
Alex Auvolat | 431b28e0cf | 5 months ago |
Alex Auvolat | 9cecea64d4 | 5 months ago |
Alex Auvolat | aa59059a91 | 5 months ago |
Alex Auvolat | d90de365b3 | 5 months ago |
Alex Auvolat | 95eb13eb08 | 5 months ago |
Alex Auvolat | c8356a91d9 | 5 months ago |
Alex Auvolat | c04dd8788a | 6 months ago |
Alex Auvolat | 539af6eac4 | 6 months ago |
Alex Auvolat | c539077d30 | 6 months ago |
Alex Auvolat | 11e6fef93c | 6 months ago |
Alex Auvolat | 539a920313 | 6 months ago |
Alex Auvolat | 78362140f5 | 6 months ago |
Alex Auvolat | d6d239fc79 | 6 months ago |
Alex Auvolat | 3ecd14b9f6 | 6 months ago |
Alex Auvolat | 22f38808e7 | 6 months ago |
Alex Auvolat | 707442f5de | 6 months ago |
Alex Auvolat | ad5c6f779f | 6 months ago |
Alex Auvolat | d4df03424f | 6 months ago |
Alex Auvolat | 33c8a489b0 | 6 months ago |
Alex Auvolat | 393c4d4515 | 6 months ago |
Alex Auvolat | 65066c7064 | 6 months ago |
Alex Auvolat | acd49de9f9 | 6 months ago |
Alex Auvolat | 46007bf01d | 6 months ago |
Alex Auvolat | b3e729f4b8 | 6 months ago |
Alex Auvolat | 7ef2c23120 | 6 months ago |
Alex Auvolat | 90e1619b1e | 6 months ago |
Alex Auvolat | 3b361d2959 | 6 months ago |
Alex Auvolat | 866196750f | 6 months ago |
Alex Auvolat | 83a11374ca | 6 months ago |
Alex Auvolat | 1aab1f4e68 | 6 months ago |
Alex Auvolat | 8e292e06b3 | 6 months ago |
Alex Auvolat | 9a491fa137 | 6 months ago |
Alex Auvolat | df24bb806d | 6 months ago |
Alex Auvolat | ce89d1ddab | 6 months ago |
Alex Auvolat | df36cf3099 | 6 months ago |
Alex Auvolat | 9d95f6f704 | 6 months ago |
Alex Auvolat | bad7cc812e | 6 months ago |
Alex Auvolat | 03ebf18830 | 6 months ago |
Alex Auvolat | 94caf9c0c1 | 6 months ago |
Alex Auvolat | bfb1845fdc | 6 months ago |
Alex Auvolat | 19ef1ec8e7 | 6 months ago |
Alex Auvolat | 8a2b1dd422 | 6 months ago |
Alex Auvolat | 523d2ecb95 | 6 months ago |
Alex Auvolat | 1da0a5676e | 6 months ago |
Alex Auvolat | 8dccee3ccf | 6 months ago |
Alex Auvolat | fe9af1dcaa | 6 months ago |
Alex Auvolat | 4a9c94514f | 6 months ago |
Alex Auvolat | 12d1dbfc6b | 6 months ago |
Alex Auvolat | 0962313ebd | 6 months ago |
@ -0,0 +1,294 @@ |
||||
use std::collections::HashMap; |
||||
use std::ops::Deref; |
||||
use std::sync::atomic::{AtomicUsize, Ordering}; |
||||
|
||||
use serde::{Deserialize, Serialize}; |
||||
|
||||
use garage_util::data::*; |
||||
|
||||
use super::*; |
||||
use crate::replication_mode::ReplicationMode; |
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] |
||||
pub struct RpcLayoutDigest { |
||||
/// Cluster layout version
|
||||
pub current_version: u64, |
||||
/// Number of active layout versions
|
||||
pub active_versions: usize, |
||||
/// Hash of cluster layout update trackers
|
||||
pub trackers_hash: Hash, |
||||
/// Hash of cluster layout staging data
|
||||
pub staging_hash: Hash, |
||||
} |
||||
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)] |
||||
pub struct SyncLayoutDigest { |
||||
current: u64, |
||||
ack_map_min: u64, |
||||
min_stored: u64, |
||||
} |
||||
|
||||
pub struct LayoutHelper { |
||||
replication_mode: ReplicationMode, |
||||
layout: Option<LayoutHistory>, |
||||
|
||||
// cached values
|
||||
ack_map_min: u64, |
||||
sync_map_min: u64, |
||||
|
||||
all_nodes: Vec<Uuid>, |
||||
all_nongateway_nodes: Vec<Uuid>, |
||||
|
||||
trackers_hash: Hash, |
||||
staging_hash: Hash, |
||||
|
||||
// ack lock: counts in-progress write operations for each
|
||||
// layout version ; we don't increase the ack update tracker
|
||||
// while this lock is nonzero
|
||||
pub(crate) ack_lock: HashMap<u64, AtomicUsize>, |
||||
} |
||||
|
||||
impl Deref for LayoutHelper { |
||||
type Target = LayoutHistory; |
||||
fn deref(&self) -> &LayoutHistory { |
||||
self.layout() |
||||
} |
||||
} |
||||
|
||||
impl LayoutHelper { |
||||
pub fn new( |
||||
replication_mode: ReplicationMode, |
||||
mut layout: LayoutHistory, |
||||
mut ack_lock: HashMap<u64, AtomicUsize>, |
||||
) -> Self { |
||||
// In the new() function of the helper, we do a bunch of cleanup
|
||||
// and calculations on the layout history to make sure things are
|
||||
// correct and we have rapid access to important values such as
|
||||
// the layout versions to use when reading to ensure consistency.
|
||||
|
||||
if !replication_mode.is_read_after_write_consistent() { |
||||
// Fast path for when no consistency is required.
|
||||
// In this case we only need to keep the last version of the layout,
|
||||
// we don't care about coordinating stuff in the cluster.
|
||||
layout.keep_current_version_only(); |
||||
} |
||||
|
||||
layout.cleanup_old_versions(); |
||||
|
||||
let all_nodes = layout.get_all_nodes(); |
||||
let all_nongateway_nodes = layout.get_all_nongateway_nodes(); |
||||
|
||||
layout.clamp_update_trackers(&all_nodes); |
||||
|
||||
let min_version = layout.min_stored(); |
||||
|
||||
// ack_map_min is the minimum value of ack_map among all nodes
|
||||
// in the cluster (gateway, non-gateway, current and previous layouts).
|
||||
// It is the highest layout version which all of these nodes have
|
||||
// acknowledged, indicating that they are aware of it and are no
|
||||
// longer processing write operations that did not take it into account.
|
||||
let ack_map_min = layout |
||||
.update_trackers |
||||
.ack_map |
||||
.min_among(&all_nodes, min_version); |
||||
|
||||
// sync_map_min is the minimum value of sync_map among storage nodes
|
||||
// in the cluster (non-gateway nodes only, current and previous layouts).
|
||||
// It is the highest layout version for which we know that all relevant
|
||||
// storage nodes have fullfilled a sync, and therefore it is safe to
|
||||
// use a read quorum within that layout to ensure consistency.
|
||||
// Gateway nodes are excluded here because they hold no relevant data
|
||||
// (they store the bucket and access key tables, but we don't have
|
||||
// consistency on those).
|
||||
// This value is calculated using quorums to allow progress even
|
||||
// if not all nodes have successfully completed a sync.
|
||||
let sync_map_min = |
||||
layout.calculate_sync_map_min_with_quorum(replication_mode, &all_nongateway_nodes); |
||||
|
||||
let trackers_hash = layout.calculate_trackers_hash(); |
||||
let staging_hash = layout.calculate_staging_hash(); |
||||
|
||||
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0); |
||||
ack_lock |
||||
.entry(layout.current().version) |
||||
.or_insert(AtomicUsize::new(0)); |
||||
|
||||
LayoutHelper { |
||||
replication_mode, |
||||
layout: Some(layout), |
||||
ack_map_min, |
||||
sync_map_min, |
||||
all_nodes, |
||||
all_nongateway_nodes, |
||||
trackers_hash, |
||||
staging_hash, |
||||
ack_lock, |
||||
} |
||||
} |
||||
|
||||
// ------------------ single updating function --------------
|
||||
|
||||
fn layout(&self) -> &LayoutHistory { |
||||
self.layout.as_ref().unwrap() |
||||
} |
||||
|
||||
pub(crate) fn update<F>(&mut self, f: F) -> bool |
||||
where |
||||
F: FnOnce(&mut LayoutHistory) -> bool, |
||||
{ |
||||
let changed = f(self.layout.as_mut().unwrap()); |
||||
if changed { |
||||
*self = Self::new( |
||||
self.replication_mode, |
||||
self.layout.take().unwrap(), |
||||
std::mem::take(&mut self.ack_lock), |
||||
); |
||||
} |
||||
changed |
||||
} |
||||
|
||||
// ------------------ read helpers ---------------
|
||||
|
||||
pub fn all_nodes(&self) -> &[Uuid] { |
||||
&self.all_nodes |
||||
} |
||||
|
||||
pub fn all_nongateway_nodes(&self) -> &[Uuid] { |
||||
&self.all_nongateway_nodes |
||||
} |
||||
|
||||
pub fn ack_map_min(&self) -> u64 { |
||||
self.ack_map_min |
||||
} |
||||
|
||||
pub fn sync_map_min(&self) -> u64 { |
||||
self.sync_map_min |
||||
} |
||||
|
||||
pub fn sync_digest(&self) -> SyncLayoutDigest { |
||||
SyncLayoutDigest { |
||||
current: self.layout().current().version, |
||||
ack_map_min: self.ack_map_min(), |
||||
min_stored: self.layout().min_stored(), |
||||
} |
||||
} |
||||
|
||||
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { |
||||
let sync_min = self.sync_map_min; |
||||
let version = self |
||||
.layout() |
||||
.versions |
||||
.iter() |
||||
.find(|x| x.version == sync_min) |
||||
.or(self.layout().versions.last()) |
||||
.unwrap(); |
||||
version |
||||
.nodes_of(position, version.replication_factor) |
||||
.collect() |
||||
} |
||||
|
||||
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { |
||||
self.layout() |
||||
.versions |
||||
.iter() |
||||
.map(|x| x.nodes_of(position, x.replication_factor).collect()) |
||||
.collect() |
||||
} |
||||
|
||||
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> { |
||||
let mut ret = vec![]; |
||||
for version in self.layout().versions.iter() { |
||||
ret.extend(version.nodes_of(position, version.replication_factor)); |
||||
} |
||||
ret.sort(); |
||||
ret.dedup(); |
||||
ret |
||||
} |
||||
|
||||
pub fn trackers_hash(&self) -> Hash { |
||||
self.trackers_hash |
||||
} |
||||
|
||||
pub fn staging_hash(&self) -> Hash { |
||||
self.staging_hash |
||||
} |
||||
|
||||
pub fn digest(&self) -> RpcLayoutDigest { |
||||
RpcLayoutDigest { |
||||
current_version: self.current().version, |
||||
active_versions: self.versions.len(), |
||||
trackers_hash: self.trackers_hash, |
||||
staging_hash: self.staging_hash, |
||||
} |
||||
} |
||||
|
||||
// ------------------ helpers for update tracking ---------------
|
||||
|
||||
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) { |
||||
// Ensure trackers for this node's values are up-to-date
|
||||
|
||||
// 1. Acknowledge the last layout version which is not currently
|
||||
// locked by an in-progress write operation
|
||||
self.ack_max_free(local_node_id); |
||||
|
||||
// 2. Assume the data on this node is sync'ed up at least to
|
||||
// the first layout version in the history
|
||||
self.sync_first(local_node_id); |
||||
|
||||
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
||||
self.sync_ack(local_node_id); |
||||
|
||||
debug!("ack_map: {:?}", self.update_trackers.ack_map); |
||||
debug!("sync_map: {:?}", self.update_trackers.sync_map); |
||||
debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); |
||||
} |
||||
|
||||
fn sync_first(&mut self, local_node_id: Uuid) { |
||||
let first_version = self.min_stored(); |
||||
self.update(|layout| { |
||||
layout |
||||
.update_trackers |
||||
.sync_map |
||||
.set_max(local_node_id, first_version) |
||||
}); |
||||
} |
||||
|
||||
fn sync_ack(&mut self, local_node_id: Uuid) { |
||||
let sync_map_min = self.sync_map_min; |
||||
self.update(|layout| { |
||||
layout |
||||
.update_trackers |
||||
.sync_ack_map |
||||
.set_max(local_node_id, sync_map_min) |
||||
}); |
||||
} |
||||
|
||||
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool { |
||||
let max_ack = self.max_free_ack(); |
||||
let changed = self.update(|layout| { |
||||
layout |
||||
.update_trackers |
||||
.ack_map |
||||
.set_max(local_node_id, max_ack) |
||||
}); |
||||
if changed { |
||||
info!("ack_until updated to {}", max_ack); |
||||
} |
||||
changed |
||||
} |
||||
|
||||
pub(crate) fn max_free_ack(&self) -> u64 { |
||||
self.layout() |
||||
.versions |
||||
.iter() |
||||
.map(|x| x.version) |
||||
.skip_while(|v| { |
||||
self.ack_lock |
||||
.get(v) |
||||
.map(|x| x.load(Ordering::Relaxed) == 0) |
||||
.unwrap_or(true) |
||||
}) |
||||
.next() |
||||
.unwrap_or(self.current().version) |
||||
} |
||||
} |
@ -0,0 +1,306 @@ |
||||
use std::collections::HashSet; |
||||
|
||||
use garage_util::crdt::{Crdt, Lww, LwwMap}; |
||||
use garage_util::data::*; |
||||
use garage_util::encode::nonversioned_encode; |
||||
use garage_util::error::*; |
||||
|
||||
use super::*; |
||||
use crate::replication_mode::ReplicationMode; |
||||
|
||||
impl LayoutHistory { |
||||
pub fn new(replication_factor: usize) -> Self { |
||||
let version = LayoutVersion::new(replication_factor); |
||||
|
||||
let staging = LayoutStaging { |
||||
parameters: Lww::<LayoutParameters>::new(version.parameters), |
||||
roles: LwwMap::new(), |
||||
}; |
||||
|
||||
LayoutHistory { |
||||
versions: vec![version], |
||||
old_versions: vec![], |
||||
update_trackers: Default::default(), |
||||
staging: Lww::raw(0, staging), |
||||
} |
||||
} |
||||
|
||||
// ------------------ who stores what now? ---------------
|
||||
|
||||
pub fn current(&self) -> &LayoutVersion { |
||||
self.versions.last().as_ref().unwrap() |
||||
} |
||||
|
||||
pub fn min_stored(&self) -> u64 { |
||||
self.versions.first().as_ref().unwrap().version |
||||
} |
||||
|
||||
pub fn get_all_nodes(&self) -> Vec<Uuid> { |
||||
if self.versions.len() == 1 { |
||||
self.versions[0].all_nodes().to_vec() |
||||
} else { |
||||
let set = self |
||||
.versions |
||||
.iter() |
||||
.flat_map(|x| x.all_nodes()) |
||||
.collect::<HashSet<_>>(); |
||||
set.into_iter().copied().collect::<Vec<_>>() |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> { |
||||
if self.versions.len() == 1 { |
||||
self.versions[0].nongateway_nodes().to_vec() |
||||
} else { |
||||
let set = self |
||||
.versions |
||||
.iter() |
||||
.flat_map(|x| x.nongateway_nodes()) |
||||
.collect::<HashSet<_>>(); |
||||
set.into_iter().copied().collect::<Vec<_>>() |
||||
} |
||||
} |
||||
|
||||
// ---- housekeeping (all invoked by LayoutHelper) ----
|
||||
|
||||
pub(crate) fn keep_current_version_only(&mut self) { |
||||
while self.versions.len() > 1 { |
||||
let removed = self.versions.remove(0); |
||||
self.old_versions.push(removed); |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn cleanup_old_versions(&mut self) { |
||||
// If there are invalid versions before valid versions, remove them
|
||||
if self.versions.len() > 1 && self.current().check().is_ok() { |
||||
while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() { |
||||
let removed = self.versions.remove(0); |
||||
info!( |
||||
"Layout history: pruning old invalid version {}", |
||||
removed.version |
||||
); |
||||
} |
||||
} |
||||
|
||||
// If there are old versions that no one is reading from anymore,
|
||||
// remove them (keep them in self.old_versions).
|
||||
// ASSUMPTION: we only care about where nodes in the current layout version
|
||||
// are reading from, as we assume older nodes are being discarded.
|
||||
let current_nodes = &self.current().node_id_vec; |
||||
let min_version = self.min_stored(); |
||||
let sync_ack_map_min = self |
||||
.update_trackers |
||||
.sync_ack_map |
||||
.min_among(current_nodes, min_version); |
||||
while self.min_stored() < sync_ack_map_min { |
||||
assert!(self.versions.len() > 1); |
||||
let removed = self.versions.remove(0); |
||||
info!( |
||||
"Layout history: moving version {} to old_versions", |
||||
removed.version |
||||
); |
||||
self.old_versions.push(removed); |
||||
} |
||||
|
||||
while self.old_versions.len() > OLD_VERSION_COUNT { |
||||
let removed = self.old_versions.remove(0); |
||||
info!("Layout history: removing old_version {}", removed.version); |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { |
||||
let min_v = self.min_stored(); |
||||
for node in nodes { |
||||
self.update_trackers.ack_map.set_max(*node, min_v); |
||||
self.update_trackers.sync_map.set_max(*node, min_v); |
||||
self.update_trackers.sync_ack_map.set_max(*node, min_v); |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn calculate_sync_map_min_with_quorum( |
||||
&self, |
||||
replication_mode: ReplicationMode, |
||||
all_nongateway_nodes: &[Uuid], |
||||
) -> u64 { |
||||
// This function calculates the minimum layout version from which
|
||||
// it is safe to read if we want to maintain read-after-write consistency.
|
||||
// In the general case the computation can be a bit expensive so
|
||||
// we try to optimize it in several ways.
|
||||
|
||||
// If there is only one layout version, we know that's the one
|
||||
// we need to read from.
|
||||
if self.versions.len() == 1 { |
||||
return self.current().version; |
||||
} |
||||
|
||||
let quorum = replication_mode.write_quorum(); |
||||
|
||||
let min_version = self.min_stored(); |
||||
let global_min = self |
||||
.update_trackers |
||||
.sync_map |
||||
.min_among(all_nongateway_nodes, min_version); |
||||
|
||||
// If the write quorums are equal to the total number of nodes,
|
||||
// i.e. no writes can succeed while they are not written to all nodes,
|
||||
// then we must in all case wait for all nodes to complete a sync.
|
||||
// This is represented by reading from the layout with version
|
||||
// number global_min, the smallest layout version for which all nodes
|
||||
// have completed a sync.
|
||||
if quorum == self.current().replication_factor { |
||||
return global_min; |
||||
} |
||||
|
||||
// In the general case, we need to look at all write sets for all partitions,
|
||||
// and find a safe layout version to read for that partition. We then
|
||||
// take the minimum value among all partition as the safe layout version
|
||||
// to read in all cases (the layout version to which all reads are directed).
|
||||
let mut current_min = self.current().version; |
||||
let mut sets_done = HashSet::<Vec<Uuid>>::new(); |
||||
|
||||
for (_, p_hash) in self.current().partitions() { |
||||
for v in self.versions.iter() { |
||||
if v.version == self.current().version { |
||||
// We don't care about whether nodes in the latest layout version
|
||||
// have completed a sync or not, as the sync is push-only
|
||||
// and by definition nodes in the latest layout version do not
|
||||
// hold data that must be pushed to nodes in the latest layout
|
||||
// version, since that's the same version (any data that's
|
||||
// already in the latest version is assumed to have been written
|
||||
// by an operation that ensured a quorum of writes within
|
||||
// that version).
|
||||
continue; |
||||
} |
||||
|
||||
// Determine set of nodes for partition p in layout version v.
|
||||
// Sort the node set to avoid duplicate computations.
|
||||
let mut set = v |
||||
.nodes_of(&p_hash, v.replication_factor) |
||||
.collect::<Vec<Uuid>>(); |
||||
set.sort(); |
||||
|
||||
// If this set was already processed, skip it.
|
||||
if sets_done.contains(&set) { |
||||
continue; |
||||
} |
||||
|
||||
// Find the value of the sync update trackers that is the
|
||||
// highest possible minimum within a quorum of nodes.
|
||||
let mut sync_values = set |
||||
.iter() |
||||
.map(|x| self.update_trackers.sync_map.get(x, min_version)) |
||||
.collect::<Vec<_>>(); |
||||
sync_values.sort(); |
||||
let set_min = sync_values[sync_values.len() - quorum]; |
||||
if set_min < current_min { |
||||
current_min = set_min; |
||||
} |
||||
// defavorable case, we know we are at the smallest possible version,
|
||||
// so we can stop early
|
||||
assert!(current_min >= global_min); |
||||
if current_min == global_min { |
||||
return current_min; |
||||
} |
||||
|
||||
// Add set to already processed sets
|
||||
sets_done.insert(set); |
||||
} |
||||
} |
||||
|
||||
current_min |
||||
} |
||||
|
||||
pub(crate) fn calculate_trackers_hash(&self) -> Hash { |
||||
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..]) |
||||
} |
||||
|
||||
pub(crate) fn calculate_staging_hash(&self) -> Hash { |
||||
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) |
||||
} |
||||
|
||||
// ================== updates to layout, public interface ===================
|
||||
|
||||
pub fn merge(&mut self, other: &LayoutHistory) -> bool { |
||||
let mut changed = false; |
||||
|
||||
// Add any new versions to history
|
||||
for v2 in other.versions.iter() { |
||||
if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { |
||||
// Version is already present, check consistency
|
||||
if v1 != v2 { |
||||
error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); |
||||
} |
||||
} else if self.versions.iter().all(|v| v.version != v2.version - 1) { |
||||
error!( |
||||
"Cannot receive new layout version {}, version {} is missing", |
||||
v2.version, |
||||
v2.version - 1 |
||||
); |
||||
} else { |
||||
self.versions.push(v2.clone()); |
||||
changed = true; |
||||
} |
||||
} |
||||
|
||||
// Merge trackers
|
||||
let c = self.update_trackers.merge(&other.update_trackers); |
||||
changed = changed || c; |
||||
|
||||
// Merge staged layout changes
|
||||
if self.staging != other.staging { |
||||
let prev_staging = self.staging.clone(); |
||||
self.staging.merge(&other.staging); |
||||
changed = changed || self.staging != prev_staging; |
||||
} |
||||
|
||||
changed |
||||
} |
||||
|
||||
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> { |
||||
match version { |
||||
None => { |
||||
let error = r#" |
||||
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. |
||||
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. |
||||
"#; |
||||
return Err(Error::Message(error.into())); |
||||
} |
||||
Some(v) => { |
||||
if v != self.current().version + 1 { |
||||
return Err(Error::Message("Invalid new layout version".into())); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Compute new version and add it to history
|
||||
let (new_version, msg) = self |
||||
.current() |
||||
.clone() |
||||
.calculate_next_version(self.staging.get())?; |
||||
|
||||
self.versions.push(new_version); |
||||
self.cleanup_old_versions(); |
||||
|
||||
// Reset the staged layout changes
|
||||
self.staging.update(LayoutStaging { |
||||
parameters: self.staging.get().parameters.clone(), |
||||
roles: LwwMap::new(), |
||||
}); |
||||
|
||||
Ok((self, msg)) |
||||
} |
||||
|
||||
pub fn revert_staged_changes(mut self) -> Result<Self, Error> { |
||||
self.staging.update(LayoutStaging { |
||||
parameters: Lww::new(self.current().parameters), |
||||
roles: LwwMap::new(), |
||||
}); |
||||
|
||||
Ok(self) |
||||
} |
||||
|
||||
pub fn check(&self) -> Result<(), String> { |
||||
// TODO: anything more ?
|
||||
self.current().check() |
||||
} |
||||
} |
@ -0,0 +1,378 @@ |
||||
use std::collections::HashMap; |
||||
use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard}; |
||||
use std::time::Duration; |
||||
|
||||
use tokio::sync::Notify; |
||||
|
||||
use netapp::endpoint::Endpoint; |
||||
use netapp::peering::fullmesh::FullMeshPeeringStrategy; |
||||
use netapp::NodeID; |
||||
|
||||
use garage_util::config::Config; |
||||
use garage_util::data::*; |
||||
use garage_util::error::*; |
||||
use garage_util::persister::Persister; |
||||
|
||||
use super::*; |
||||
use crate::replication_mode::ReplicationMode; |
||||
use crate::rpc_helper::*; |
||||
use crate::system::*; |
||||
|
||||
pub struct LayoutManager { |
||||
node_id: Uuid, |
||||
replication_mode: ReplicationMode, |
||||
persist_cluster_layout: Persister<LayoutHistory>, |
||||
|
||||
layout: Arc<RwLock<LayoutHelper>>, |
||||
pub(crate) change_notify: Arc<Notify>, |
||||
|
||||
table_sync_version: Mutex<HashMap<String, u64>>, |
||||
|
||||
pub(crate) rpc_helper: RpcHelper, |
||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>, |
||||
} |
||||
|
||||
impl LayoutManager { |
||||
pub fn new( |
||||
config: &Config, |
||||
node_id: NodeID, |
||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>, |
||||
fullmesh: Arc<FullMeshPeeringStrategy>, |
||||
replication_mode: ReplicationMode, |
||||
) -> Result<Arc<Self>, Error> { |
||||
let replication_factor = replication_mode.replication_factor(); |
||||
|
||||
let persist_cluster_layout: Persister<LayoutHistory> = |
||||
Persister::new(&config.metadata_dir, "cluster_layout"); |
||||
|
||||
let cluster_layout = match persist_cluster_layout.load() { |
||||
Ok(x) => { |
||||
if x.current().replication_factor != replication_mode.replication_factor() { |
||||
return Err(Error::Message(format!( |
||||
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", |
||||
x.current().replication_factor, |
||||
replication_factor |
||||
))); |
||||
} |
||||
x |
||||
} |
||||
Err(e) => { |
||||
info!( |
||||
"No valid previous cluster layout stored ({}), starting fresh.", |
||||
e |
||||
); |
||||
LayoutHistory::new(replication_factor) |
||||
} |
||||
}; |
||||
|
||||
let mut cluster_layout = |
||||
LayoutHelper::new(replication_mode, cluster_layout, Default::default()); |
||||
cluster_layout.update_trackers(node_id.into()); |
||||
|
||||
let layout = Arc::new(RwLock::new(cluster_layout)); |
||||
let change_notify = Arc::new(Notify::new()); |
||||
|
||||
let rpc_helper = RpcHelper::new( |
||||
node_id.into(), |
||||
fullmesh, |
||||
layout.clone(), |
||||
config.rpc_timeout_msec.map(Duration::from_millis), |
||||
); |
||||
|
||||
Ok(Arc::new(Self { |
||||
node_id: node_id.into(), |
||||
replication_mode, |
||||
persist_cluster_layout, |
||||
layout, |
||||
change_notify, |
||||
table_sync_version: Mutex::new(HashMap::new()), |
||||
system_endpoint, |
||||
rpc_helper, |
||||
})) |
||||
} |
||||
|
||||
// ---- PUBLIC INTERFACE ----
|
||||
|
||||
pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHelper> { |
||||
self.layout.read().unwrap() |
||||
} |
||||
|
||||
pub async fn update_cluster_layout( |
||||
self: &Arc<Self>, |
||||
layout: &LayoutHistory, |
||||
) -> Result<(), Error> { |
||||
self.handle_advertise_cluster_layout(layout).await?; |
||||
Ok(()) |
||||
} |
||||
|
||||
pub fn add_table(&self, table_name: &'static str) { |
||||
let first_version = self.layout().versions.first().unwrap().version; |
||||
|
||||
self.table_sync_version |
||||
.lock() |
||||
.unwrap() |
||||
.insert(table_name.to_string(), first_version); |
||||
} |
||||
|
||||
pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) { |
||||
let mut table_sync_version = self.table_sync_version.lock().unwrap(); |
||||
*table_sync_version.get_mut(table_name).unwrap() = version; |
||||
let sync_until = table_sync_version.iter().map(|(_, v)| *v).min().unwrap(); |
||||
drop(table_sync_version); |
||||
|
||||
let mut layout = self.layout.write().unwrap(); |
||||
if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) { |
||||
info!("sync_until updated to {}", sync_until); |
||||
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( |
||||
layout.update_trackers.clone(), |
||||
)); |
||||
} |
||||
} |
||||
|
||||
fn ack_new_version(self: &Arc<Self>) { |
||||
let mut layout = self.layout.write().unwrap(); |
||||
if layout.ack_max_free(self.node_id) { |
||||
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers( |
||||
layout.update_trackers.clone(), |
||||
)); |
||||
} |
||||
} |
||||
|
||||
// ---- ACK LOCKING ----
|
||||
|
||||
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> { |
||||
let layout = self.layout(); |
||||
let version = layout.current().version; |
||||
let nodes = layout.storage_sets_of(position); |
||||
layout |
||||
.ack_lock |
||||
.get(&version) |
||||
.unwrap() |
||||
.fetch_add(1, Ordering::Relaxed); |
||||
WriteLock::new(version, self, nodes) |
||||
} |
||||
|
||||
// ---- INTERNALS ---
|
||||
|
||||
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> { |
||||
let mut layout = self.layout.write().unwrap(); |
||||
let prev_digest = layout.digest(); |
||||
let prev_layout_check = layout.check().is_ok(); |
||||
|
||||
if !prev_layout_check || adv.check().is_ok() { |
||||
if layout.update(|l| l.merge(adv)) { |
||||
layout.update_trackers(self.node_id); |
||||
if prev_layout_check && layout.check().is_err() { |
||||
panic!("Merged two correct layouts and got an incorrect layout."); |
||||
} |
||||
assert!(layout.digest() != prev_digest); |
||||
return Some(layout.clone()); |
||||
} |
||||
} |
||||
|
||||
None |
||||
} |
||||
|
||||
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> { |
||||
let mut layout = self.layout.write().unwrap(); |
||||
let prev_digest = layout.digest(); |
||||
|
||||
if layout.update_trackers != *adv { |
||||
if layout.update(|l| l.update_trackers.merge(adv)) { |
||||
layout.update_trackers(self.node_id); |
||||
assert!(layout.digest() != prev_digest); |
||||
return Some(layout.update_trackers.clone()); |
||||
} |
||||
} |
||||
|
||||
None |
||||
} |
||||
|
||||
async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) { |
||||
let resp = self |
||||
.rpc_helper |
||||
.call( |
||||
&self.system_endpoint, |
||||
peer, |
||||
SystemRpc::PullClusterLayout, |
||||
RequestStrategy::with_priority(PRIO_HIGH), |
||||
) |
||||
.await; |
||||
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { |
||||
if let Err(e) = self.handle_advertise_cluster_layout(&layout).await { |
||||
warn!("In pull_cluster_layout: {}", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
async fn pull_cluster_layout_trackers(self: &Arc<Self>, peer: Uuid) { |
||||
let resp = self |
||||
.rpc_helper |
||||
.call( |
||||
&self.system_endpoint, |
||||
peer, |
||||
SystemRpc::PullClusterLayoutTrackers, |
||||
RequestStrategy::with_priority(PRIO_HIGH), |
||||
) |
||||
.await; |
||||
if let Ok(SystemRpc::AdvertiseClusterLayoutTrackers(trackers)) = resp { |
||||
if let Err(e) = self |
||||
.handle_advertise_cluster_layout_trackers(&trackers) |
||||
.await |
||||
{ |
||||
warn!("In pull_cluster_layout_trackers: {}", e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/// Save cluster layout data to disk
|
||||
async fn save_cluster_layout(&self) -> Result<(), Error> { |
||||
let layout = self.layout.read().unwrap().clone(); |
||||
self.persist_cluster_layout |
||||
.save_async(&layout) |
||||
.await |
||||
.expect("Cannot save current cluster layout"); |
||||
Ok(()) |
||||
} |
||||
|
||||
fn broadcast_update(self: &Arc<Self>, rpc: SystemRpc) { |
||||
tokio::spawn({ |
||||
let this = self.clone(); |
||||
async move { |
||||
if let Err(e) = this |
||||
.rpc_helper |
||||
.broadcast( |
||||
&this.system_endpoint, |
||||
rpc, |
||||
RequestStrategy::with_priority(PRIO_HIGH), |
||||
) |
||||
.await |
||||
{ |
||||
warn!("Error while broadcasting new cluster layout: {}", e); |
||||
} |
||||
} |
||||
}); |
||||
} |
||||
|
||||
// ---- RPC HANDLERS ----
|
||||
|
||||
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) { |
||||
let local = self.layout().digest(); |
||||
if remote.current_version > local.current_version |
||||
|| remote.active_versions != local.active_versions |
||||
|| remote.staging_hash != local.staging_hash |
||||
{ |
||||
tokio::spawn({ |
||||
let this = self.clone(); |
||||
async move { this.pull_cluster_layout(from).await } |
||||
}); |
||||
} else if remote.trackers_hash != local.trackers_hash { |
||||
tokio::spawn({ |
||||
let this = self.clone(); |
||||
async move { this.pull_cluster_layout_trackers(from).await } |
||||
}); |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { |
||||
let layout = self.layout.read().unwrap().clone(); |
||||
SystemRpc::AdvertiseClusterLayout(layout) |
||||
} |
||||
|
||||
pub(crate) fn handle_pull_cluster_layout_trackers(&self) -> SystemRpc { |
||||
let layout = self.layout.read().unwrap(); |
||||
SystemRpc::AdvertiseClusterLayoutTrackers(layout.update_trackers.clone()) |
||||
} |
||||
|
||||
pub(crate) async fn handle_advertise_cluster_layout( |
||||
self: &Arc<Self>, |
||||
adv: &LayoutHistory, |
||||
) -> Result<SystemRpc, Error> { |
||||
debug!( |
||||
"handle_advertise_cluster_layout: {} versions, last={}, trackers={:?}", |
||||
adv.versions.len(), |
||||
adv.current().version, |
||||
adv.update_trackers |
||||
); |
||||
|
||||
if adv.current().replication_factor != self.replication_mode.replication_factor() { |
||||
let msg = format!( |
||||
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", |
||||
adv.current().replication_factor, |
||||
self.replication_mode.replication_factor() |
||||
); |
||||
error!("{}", msg); |
||||
return Err(Error::Message(msg)); |
||||
} |
||||
|
||||
if let Some(new_layout) = self.merge_layout(adv) { |
||||
debug!("handle_advertise_cluster_layout: some changes were added to the current stuff"); |
||||
|
||||
self.change_notify.notify_waiters(); |
||||
self.broadcast_update(SystemRpc::AdvertiseClusterLayout(new_layout)); |
||||
self.save_cluster_layout().await?; |
||||
} |
||||
|
||||
Ok(SystemRpc::Ok) |
||||
} |
||||
|
||||
pub(crate) async fn handle_advertise_cluster_layout_trackers( |
||||
self: &Arc<Self>, |
||||
trackers: &UpdateTrackers, |
||||
) -> Result<SystemRpc, Error> { |
||||
debug!("handle_advertise_cluster_layout_trackers: {:?}", trackers); |
||||
|
||||
if let Some(new_trackers) = self.merge_layout_trackers(trackers) { |
||||
self.change_notify.notify_waiters(); |
||||
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(new_trackers)); |
||||
self.save_cluster_layout().await?; |
||||
} |
||||
|
||||
Ok(SystemRpc::Ok) |
||||
} |
||||
} |
||||
|
||||
// ---- ack lock ----
|
||||
|
||||
pub struct WriteLock<T> { |
||||
layout_version: u64, |
||||
layout_manager: Arc<LayoutManager>, |
||||
value: T, |
||||
} |
||||
|
||||
impl<T> WriteLock<T> { |
||||
fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self { |
||||
Self { |
||||
layout_version: version, |
||||
layout_manager: layout_manager.clone(), |
||||
value, |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl<T> AsRef<T> for WriteLock<T> { |
||||
fn as_ref(&self) -> &T { |
||||
&self.value |
||||
} |
||||
} |
||||
|
||||
impl<T> AsMut<T> for WriteLock<T> { |
||||
fn as_mut(&mut self) -> &mut T { |
||||
&mut self.value |
||||
} |
||||
} |
||||
|
||||
impl<T> Drop for WriteLock<T> { |
||||
fn drop(&mut self) { |
||||
let layout = self.layout_manager.layout(); // acquire read lock
|
||||
if let Some(counter) = layout.ack_lock.get(&self.layout_version) { |
||||
let prev_lock = counter.fetch_sub(1, Ordering::Relaxed); |
||||
if prev_lock == 1 && layout.current().version > self.layout_version { |
||||
drop(layout); // release read lock, write lock will be acquired
|
||||
self.layout_manager.ack_new_version(); |
||||
} |
||||
} else { |
||||
error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,478 @@ |
||||
use std::fmt; |
||||
|
||||
use bytesize::ByteSize; |
||||
|
||||
use garage_util::crdt::{AutoCrdt, Crdt}; |
||||
use garage_util::data::Uuid; |
||||
|
||||
mod graph_algo; |
||||
mod helper; |
||||
mod history; |
||||
mod version; |
||||
|
||||
#[cfg(test)] |
||||
mod test; |
||||
|
||||
pub mod manager; |
||||
|
||||
// ---- re-exports ----
|
||||
|
||||
pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest}; |
||||
pub use manager::WriteLock; |
||||
pub use version::*; |
||||
|
||||
// ---- defines: partitions ----
|
||||
|
||||
/// A partition id, which is stored on 16 bits
|
||||
/// i.e. we have up to 2**16 partitions.
|
||||
/// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||
pub type Partition = u16; |
||||
|
||||
// TODO: make this constant parametrizable in the config file
|
||||
// For deployments with many nodes it might make sense to bump
|
||||
// it up to 10.
|
||||
// Maximum value : 16
|
||||
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
|
||||
/// presence of numerous nodes, but exponentially bigger ring. Max 16
|
||||
pub const PARTITION_BITS: usize = 8; |
||||
|
||||
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; |
||||
|
||||
// ---- defines: nodes ----
|
||||
|
||||
// Type to store compactly the id of a node in the system
|
||||
// Change this to u16 the day we want to have more than 256 nodes in a cluster
|
||||
pub type CompactNodeType = u8; |
||||
pub const MAX_NODE_NUMBER: usize = 256; |
||||
|
||||
// ======== actual data structures for the layout data ========
|
||||
// ======== that is persisted to disk ========
|
||||
// some small utility impls are at the end of this file,
|
||||
// but most of the code that actually computes stuff is in
|
||||
// version.rs, history.rs and helper.rs
|
||||
|
||||
mod v08 { |
||||
use crate::layout::CompactNodeType; |
||||
use garage_util::crdt::LwwMap; |
||||
use garage_util::data::{Hash, Uuid}; |
||||
use serde::{Deserialize, Serialize}; |
||||
|
||||
/// The layout of the cluster, i.e. the list of roles
|
||||
/// which are assigned to each cluster node
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||
pub struct ClusterLayout { |
||||
pub version: u64, |
||||
|
||||
pub replication_factor: usize, |
||||
pub roles: LwwMap<Uuid, NodeRoleV>, |
||||
|
||||
// see comments in v010::ClusterLayout
|
||||
pub node_id_vec: Vec<Uuid>, |
||||
#[serde(with = "serde_bytes")] |
||||
pub ring_assignation_data: Vec<CompactNodeType>, |
||||
|
||||
/// Role changes which are staged for the next version of the layout
|
||||
pub staging: LwwMap<Uuid, NodeRoleV>, |
||||
pub staging_hash: Hash, |
||||
} |
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] |
||||
pub struct NodeRoleV(pub Option<NodeRole>); |
||||
|
||||
/// The user-assigned roles of cluster nodes
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] |
||||
pub struct NodeRole { |
||||
/// Datacenter at which this entry belong. This information is used to
|
||||
/// perform a better geodistribution
|
||||
pub zone: String, |
||||
/// The capacity of the node
|
||||
/// If this is set to None, the node does not participate in storing data for the system
|
||||
/// and is only active as an API gateway to other nodes
|
||||
pub capacity: Option<u64>, |
||||
/// A set of tags to recognize the node
|
||||
pub tags: Vec<String>, |
||||
} |
||||
|
||||
impl garage_util::migrate::InitialFormat for ClusterLayout {} |
||||
} |
||||
|
||||
mod v09 { |
||||
use super::v08; |
||||
use crate::layout::CompactNodeType; |
||||
use garage_util::crdt::{Lww, LwwMap}; |
||||
use garage_util::data::{Hash, Uuid}; |
||||
use serde::{Deserialize, Serialize}; |
||||
pub use v08::{NodeRole, NodeRoleV}; |
||||
|
||||
/// The layout of the cluster, i.e. the list of roles
|
||||
/// which are assigned to each cluster node
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)] |
||||
pub struct ClusterLayout { |
||||
pub version: u64, |
||||
|
||||
pub replication_factor: usize, |
||||
|
||||
/// This attribute is only used to retain the previously computed partition size,
|
||||
/// to know to what extent does it change with the layout update.
|
||||
pub partition_size: u64, |
||||
/// Parameters used to compute the assignment currently given by
|
||||
/// ring_assignment_data
|
||||
pub parameters: LayoutParameters, |
||||
|
||||
pub roles: LwwMap<Uuid, NodeRoleV>, |
||||
|
||||
// see comments in v010::ClusterLayout
|
||||
pub node_id_vec: Vec<Uuid>, |
||||
#[serde(with = "serde_bytes")] |
||||
pub ring_assignment_data: Vec<CompactNodeType>, |
||||
|
||||
/// Parameters to be used in the next partition assignment computation.
|
||||
pub staging_parameters: Lww<LayoutParameters>, |
||||
/// Role changes which are staged for the next version of the layout
|
||||
pub staging_roles: LwwMap<Uuid, NodeRoleV>, |
||||
pub staging_hash: Hash, |
||||
} |
||||
|
||||
/// This struct is used to set the parameters to be used in the assignment computation
|
||||
/// algorithm. It is stored as a Crdt.
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] |
||||
pub struct LayoutParameters { |
||||
pub zone_redundancy: ZoneRedundancy, |
||||
} |
||||
|
||||
/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
|
||||
/// of each partition on at least that number of different zones.
|
||||
/// Otherwise, copies will be stored on the maximum possible number of zones.
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] |
||||
pub enum ZoneRedundancy { |
||||
AtLeast(usize), |
||||
Maximum, |
||||
} |
||||
|
||||
impl garage_util::migrate::Migrate for ClusterLayout { |
||||
const VERSION_MARKER: &'static [u8] = b"G09layout"; |
||||
|
||||
type Previous = v08::ClusterLayout; |
||||
|
||||
fn migrate(previous: Self::Previous) -> Self { |
||||
use itertools::Itertools; |
||||
|
||||
// In the old layout, capacities are in an arbitrary unit,
|
||||
// but in the new layout they are in bytes.
|
||||
// Here we arbitrarily multiply everything by 1G,
|
||||
// such that 1 old capacity unit = 1GB in the new units.
|
||||
// This is totally arbitrary and won't work for most users.
|
||||
let cap_mul = 1024 * 1024 * 1024; |
||||
let roles = multiply_all_capacities(previous.roles, cap_mul); |
||||
let staging_roles = multiply_all_capacities(previous.staging, cap_mul); |
||||
let node_id_vec = previous.node_id_vec; |
||||
|
||||
// Determine partition size
|
||||
let mut tmp = previous.ring_assignation_data.clone(); |
||||
tmp.sort(); |
||||
let partition_size = tmp |
||||
.into_iter() |
||||
.dedup_with_count() |
||||
.map(|(npart, node)| { |
||||
roles |
||||
.get(&node_id_vec[node as usize]) |
||||
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) |
||||
.unwrap_or(0) / npart as u64 |
||||
}) |
||||
.min() |
||||
.unwrap_or(0); |
||||
|
||||
// By default, zone_redundancy is maximum possible value
|
||||
let parameters = LayoutParameters { |
||||
zone_redundancy: ZoneRedundancy::Maximum, |
||||
}; |
||||
|
||||
Self { |
||||
version: previous.version, |
||||
replication_factor: previous.replication_factor, |
||||
partition_size, |
||||
parameters, |
||||
roles, |
||||
node_id_vec, |
||||
ring_assignment_data: previous.ring_assignation_data, |
||||
staging_parameters: Lww::new(parameters), |
||||
staging_roles, |
||||
staging_hash: [0u8; 32].into(), // will be set in the next migration
|
||||
} |
||||
} |
||||
} |
||||
|
||||
fn multiply_all_capacities( |
||||
old_roles: LwwMap<Uuid, NodeRoleV>, |
||||
mul: u64, |
||||
) -> LwwMap<Uuid, NodeRoleV> { |
||||
let mut new_roles = LwwMap::new(); |
||||
for (node, ts, role) in old_roles.items() { |
||||
let mut role = role.clone(); |
||||
if let NodeRoleV(Some(NodeRole { |
||||
capacity: Some(ref mut cap), |
||||
.. |
||||
})) = role |
||||
{ |
||||
*cap *= mul; |
||||
} |
||||
new_roles.merge_raw(node, *ts, &role); |
||||
} |
||||
new_roles |
||||
} |
||||
} |
||||
|
||||
mod v010 { |
||||
use super::v09; |
||||
use crate::layout::CompactNodeType; |
||||
use garage_util::crdt::{Lww, LwwMap}; |
||||
use garage_util::data::Uuid; |
||||
use serde::{Deserialize, Serialize}; |
||||
use std::collections::BTreeMap; |
||||
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; |
||||
|
||||
/// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
|
||||
pub const OLD_VERSION_COUNT: usize = 5; |
||||
|
||||
/// The history of cluster layouts, with trackers to keep a record
|
||||
/// of which nodes are up-to-date to current cluster data
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] |
||||
pub struct LayoutHistory { |
||||
/// The versions currently in use in the cluster
|
||||
pub versions: Vec<LayoutVersion>, |
||||
/// At most 5 of the previous versions, not used by the garage_table
|
||||
/// module, but usefull for the garage_block module to find data blocks
|
||||
/// that have not yet been moved
|
||||
pub old_versions: Vec<LayoutVersion>, |
||||
|
||||
/// Update trackers
|
||||
pub update_trackers: UpdateTrackers, |
||||
|
||||
/// Staged changes for the next version
|
||||
pub staging: Lww<LayoutStaging>, |
||||
} |
||||
|
||||
/// A version of the layout of the cluster, i.e. the list of roles
|
||||
/// which are assigned to each cluster node
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] |
||||
pub struct LayoutVersion { |
||||
/// The number of this version
|
||||
pub version: u64, |
||||
|
||||
/// Roles assigned to nodes in this version
|
||||
pub roles: LwwMap<Uuid, NodeRoleV>, |
||||
/// Parameters used to compute the assignment currently given by
|
||||
/// ring_assignment_data
|
||||
pub parameters: LayoutParameters, |
||||
|
||||
/// The number of replicas for each data partition
|
||||
pub replication_factor: usize, |
||||
/// This attribute is only used to retain the previously computed partition size,
|
||||
/// to know to what extent does it change with the layout update.
|
||||
pub partition_size: u64, |
||||
|
||||
/// node_id_vec: a vector of node IDs with a role assigned
|
||||
/// in the system (this includes gateway nodes).
|
||||
/// The order here is different than the vec stored by `roles`, because:
|
||||
/// 1. non-gateway nodes are first so that they have lower numbers
|
||||
/// 2. nodes that don't have a role are excluded (but they need to
|
||||
/// stay in the CRDT as tombstones)
|
||||
pub node_id_vec: Vec<Uuid>, |
||||
/// number of non-gateway nodes, which are the first ids in node_id_vec
|
||||
pub nongateway_node_count: usize, |
||||
/// The assignation of data partitions to nodes, the values
|
||||
/// are indices in node_id_vec
|
||||
#[serde(with = "serde_bytes")] |
||||
pub ring_assignment_data: Vec<CompactNodeType>, |
||||
} |
||||
|
||||
/// The staged changes for the next layout version
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] |
||||
pub struct LayoutStaging { |
||||
/// Parameters to be used in the next partition assignment computation.
|
||||
pub parameters: Lww<LayoutParameters>, |
||||
/// Role changes which are staged for the next version of the layout
|
||||
pub roles: LwwMap<Uuid, NodeRoleV>, |
||||
} |
||||
|
||||
/// The tracker of acknowlegments and data syncs around the cluster
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] |
||||
pub struct UpdateTrackers { |
||||
/// The highest layout version number each node has ack'ed
|
||||
pub ack_map: UpdateTracker, |
||||
/// The highest layout version number each node has synced data for
|
||||
pub sync_map: UpdateTracker, |
||||
/// The highest layout version number each node has
|
||||
/// ack'ed that all other nodes have synced data for
|
||||
pub sync_ack_map: UpdateTracker, |
||||
} |
||||
|
||||
/// Generic update tracker struct
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] |
||||
pub struct UpdateTracker(pub BTreeMap<Uuid, u64>); |
||||
|
||||
impl garage_util::migrate::Migrate for LayoutHistory { |
||||
const VERSION_MARKER: &'static [u8] = b"G010lh"; |
||||
|
||||
type Previous = v09::ClusterLayout; |
||||
|
||||
fn migrate(previous: Self::Previous) -> Self { |
||||
let nongateway_node_count = previous |
||||
.node_id_vec |
||||
.iter() |
||||
.enumerate() |
||||
.filter(|(_, uuid)| { |
||||
let role = previous.roles.get(uuid); |
||||
matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) |
||||
}) |
||||
.map(|(i, _)| i + 1) |
||||
.max() |
||||
.unwrap_or(0); |
||||
|
||||
let version = LayoutVersion { |
||||
version: previous.version, |
||||
replication_factor: previous.replication_factor, |
||||
partition_size: previous.partition_size, |
||||
parameters: previous.parameters, |
||||
roles: previous.roles, |
||||
node_id_vec: previous.node_id_vec, |
||||
nongateway_node_count, |
||||
ring_assignment_data: previous.ring_assignment_data, |
||||
}; |
||||
let update_tracker = UpdateTracker( |
||||
version |
||||
.nongateway_nodes() |
||||
.iter() |
||||
.copied() |
||||
.map(|x| (x, version.version)) |
||||
.collect::<BTreeMap<Uuid, u64>>(), |
||||
); |
||||
let staging = LayoutStaging { |
||||
parameters: previous.staging_parameters, |
||||
roles: previous.staging_roles, |
||||
}; |
||||
Self { |
||||
versions: vec![version], |
||||
old_versions: vec![], |
||||
update_trackers: UpdateTrackers { |
||||
ack_map: update_tracker.clone(), |
||||
sync_map: update_tracker.clone(), |
||||
sync_ack_map: update_tracker, |
||||
}, |
||||
staging: Lww::raw(previous.version, staging), |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
pub use v010::*; |
||||
|
||||
// ---- utility functions ----
|
||||
|
||||
impl AutoCrdt for LayoutParameters { |
||||
const WARN_IF_DIFFERENT: bool = true; |
||||
} |
||||
|
||||
impl AutoCrdt for NodeRoleV { |
||||
const WARN_IF_DIFFERENT: bool = true; |
||||
} |
||||
|
||||
impl Crdt for LayoutStaging { |
||||
fn merge(&mut self, other: &LayoutStaging) { |
||||
self.parameters.merge(&other.parameters); |
||||
self.roles.merge(&other.roles); |
||||
} |
||||
} |
||||
|
||||
impl NodeRole { |
||||
pub fn capacity_string(&self) -> String { |
||||
match self.capacity { |
||||
Some(c) => ByteSize::b(c).to_string_as(false), |
||||
None => "gateway".to_string(), |
||||
} |
||||
} |
||||
|
||||
pub fn tags_string(&self) -> String { |
||||
self.tags.join(",") |
||||
} |
||||
} |
||||
|
||||
impl fmt::Display for ZoneRedundancy { |
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
||||
match self { |
||||
ZoneRedundancy::Maximum => write!(f, "maximum"), |
||||
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl core::str::FromStr for ZoneRedundancy { |
||||
type Err = &'static str; |
||||
fn from_str(s: &str) -> Result<Self, Self::Err> { |
||||
match s { |
||||
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), |
||||
x => { |
||||
let v = x |
||||
.parse::<usize>() |
||||
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; |
||||
Ok(ZoneRedundancy::AtLeast(v)) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl UpdateTracker { |
||||
fn merge(&mut self, other: &UpdateTracker) -> bool { |
||||
let mut changed = false; |
||||
for (k, v) in other.0.iter() { |
||||
if let Some(v_mut) = self.0.get_mut(k) { |
||||
if *v > *v_mut { |
||||
*v_mut = *v; |
||||
changed = true; |
||||
} |
||||
} else { |
||||
self.0.insert(*k, *v); |
||||
changed = true; |
||||
} |
||||
} |
||||
changed |
||||
} |
||||
|
||||
/// This bumps the update tracker for a given node up to the specified value.
|
||||
/// This has potential impacts on the correctness of Garage and should only
|
||||
/// be used in very specific circumstances.
|
||||
pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool { |
||||
match self.0.get_mut(&peer) { |
||||
Some(e) if *e < value => { |
||||
*e = value; |
||||
true |
||||
} |
||||
None => { |
||||
self.0.insert(peer, value); |
||||
true |
||||
} |
||||
_ => false, |
||||
} |
||||
} |
||||
|
||||
pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { |
||||
storage_nodes |
||||
.iter() |
||||
.map(|x| self.get(x, min_version)) |
||||
.min() |
||||
.unwrap_or(min_version) |
||||
} |
||||
|
||||
pub fn get(&self, node: &Uuid, min_version: u64) -> u64 { |
||||
self.0.get(node).copied().unwrap_or(min_version) |
||||
} |
||||
} |
||||
|
||||
impl UpdateTrackers { |
||||
pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { |
||||
let c1 = self.ack_map.merge(&other.ack_map); |
||||
let c2 = self.sync_map.merge(&other.sync_map); |
||||
let c3 = self.sync_ack_map.merge(&other.sync_ack_map); |
||||
c1 || c2 || c3 |
||||
} |
||||
} |
@ -0,0 +1,157 @@ |
||||
use std::cmp::min; |
||||
use std::collections::HashMap; |
||||
|
||||
use garage_util::crdt::Crdt; |
||||
use garage_util::error::*; |
||||
|
||||
use crate::layout::*; |
||||
|
||||
// This function checks that the partition size S computed is at least better than the
|
||||
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
||||
// assuming a partion size of S+1. If we succed, it means that the optimal assignment
|
||||
// was not optimal. The naive algorithm is the following :
|
||||
// - we compute the max number of partitions associated to every node, capped at the
|
||||
// partition number. It gives the number of tokens of every node.
|
||||
// - every zone has a number of tokens equal to the sum of the tokens of its nodes.
|
||||
// - we cycle over the partitions and associate zone tokens while respecting the
|
||||
// zone redundancy constraint.
|
||||
// NOTE: the naive algorithm is not optimal. Counter example:
|
||||
// take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
|
||||
// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
|
||||
// With these parameters, the naive algo fails, whereas there is a solution:
|
||||
// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
|
||||
fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> { |
||||
let over_size = cl.partition_size + 1; |
||||
let mut zone_token = HashMap::<String, usize>::new(); |
||||
|
||||
let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?; |
||||
|
||||
if zones.is_empty() { |
||||
return Ok(false); |
||||
} |
||||
|
||||
for z in zones.iter() { |
||||
zone_token.insert(z.clone(), 0); |
||||
} |
||||
for uuid in cl.nongateway_nodes() { |
||||
let z = cl.expect_get_node_zone(&uuid); |
||||
let c = cl.expect_get_node_capacity(&uuid); |
||||
zone_token.insert( |
||||
z.to_string(), |
||||
zone_token[z] + min(NB_PARTITIONS, (c / over_size) as usize), |
||||
); |
||||
} |
||||
|
||||
// For every partition, we count the number of zone already associated and
|
||||
// the name of the last zone associated
|
||||
|
||||
let mut id_zone_token = vec![0; zones.len()]; |
||||
for (z, t) in zone_token.iter() { |
||||
id_zone_token[zone_to_id[z]] = *t; |
||||
} |
||||
|
||||
let mut nb_token = vec![0; NB_PARTITIONS]; |
||||
let mut last_zone = vec![zones.len(); NB_PARTITIONS]; |
||||
|
||||
let mut curr_zone = 0; |
||||
|
||||
let redundancy = cl.effective_zone_redundancy(); |
||||
|
||||
for replic in 0..cl.replication_factor { |
||||
for p in 0..NB_PARTITIONS { |
||||
while id_zone_token[curr_zone] == 0 |
||||
|| (last_zone[p] == curr_zone |
||||
&& redundancy - nb_token[p] <= cl.replication_factor - replic) |
||||
{ |
||||
curr_zone += 1; |
||||
if curr_zone >= zones.len() { |
||||
return Ok(true); |
||||
} |
||||
} |
||||
id_zone_token[curr_zone] -= 1; |
||||
if last_zone[p] != curr_zone { |
||||
nb_token[p] += 1; |
||||
last_zone[p] = curr_zone; |
||||
} |
||||
} |
||||
} |
||||
|
||||
return Ok(false); |
||||
} |
||||
|
||||
fn show_msg(msg: &Message) { |
||||
for s in msg.iter() { |
||||
println!("{}", s); |
||||
} |
||||
} |
||||
|
||||
fn update_layout( |
||||
cl: &mut LayoutHistory, |
||||
node_capacity_vec: &[u64], |
||||
node_zone_vec: &[&'static str], |
||||
zone_redundancy: usize, |
||||
) { |
||||
let staging = cl.staging.get_mut(); |
||||
|
||||
for (i, (capacity, zone)) in node_capacity_vec |
||||
.iter() |
||||
.zip(node_zone_vec.iter()) |
||||
.enumerate() |
||||
{ |
||||
let node_id = [i as u8; 32].into(); |
||||
|
||||
let update = staging.roles.update_mutator( |
||||
node_id, |
||||
NodeRoleV(Some(NodeRole { |
||||
zone: zone.to_string(), |
||||
capacity: Some(*capacity), |
||||
tags: (vec![]), |
||||
})), |
||||
); |
||||
staging.roles.merge(&update); |
||||
} |
||||
staging.parameters.update(LayoutParameters { |
||||
zone_redundancy: ZoneRedundancy::AtLeast(zone_redundancy), |
||||
}); |
||||
} |
||||
|
||||
#[test] |
||||
fn test_assignment() { |
||||
let mut node_capacity_vec = vec![4000, 1000, 2000]; |
||||
let mut node_zone_vec = vec!["A", "B", "C"]; |
||||
|
||||
let mut cl = LayoutHistory::new(3); |
||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); |
||||
let v = cl.current().version; |
||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); |
||||
show_msg(&msg); |
||||
assert_eq!(cl.check(), Ok(())); |
||||
assert!(check_against_naive(cl.current()).unwrap()); |
||||
|
||||
node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000]; |
||||
node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]; |
||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 2); |
||||
let v = cl.current().version; |
||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); |
||||
show_msg(&msg); |
||||
assert_eq!(cl.check(), Ok(())); |
||||
assert!(check_against_naive(cl.current()).unwrap()); |
||||
|
||||
node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000]; |
||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); |
||||
let v = cl.current().version; |
||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); |
||||
show_msg(&msg); |
||||
assert_eq!(cl.check(), Ok(())); |
||||
assert!(check_against_naive(cl.current()).unwrap()); |
||||
|
||||
node_capacity_vec = vec![ |
||||
4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000, |
||||
]; |
||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 1); |
||||
let v = cl.current().version; |
||||
let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); |
||||
show_msg(&msg); |
||||
assert_eq!(cl.check(), Ok(())); |
||||
assert!(check_against_naive(cl.current()).unwrap()); |
||||
} |
@ -1,164 +0,0 @@ |
||||
//! Module containing types related to computing nodes which should receive a copy of data blocks
|
||||
//! and metadata
|
||||
use std::convert::TryInto; |
||||
|
||||
use garage_util::data::*; |
||||
|
||||
use crate::layout::ClusterLayout; |
||||
|
||||
/// A partition id, which is stored on 16 bits
|
||||
/// i.e. we have up to 2**16 partitions.
|
||||
/// (in practice we have exactly 2**PARTITION_BITS partitions)
|
||||
pub type Partition = u16; |
||||
|
||||
// TODO: make this constant parametrizable in the config file
|
||||
// For deployments with many nodes it might make sense to bump
|
||||
// it up to 10.
|
||||
// Maximum value : 16
|
||||
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
|
||||
/// presence of numerous nodes, but exponentially bigger ring. Max 16
|
||||
pub const PARTITION_BITS: usize = 8; |
||||
|
||||
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); |
||||
|
||||
/// A ring distributing fairly objects to nodes
|
||||
#[derive(Clone)] |
||||
pub struct Ring { |
||||
/// The replication factor for this ring
|
||||
pub replication_factor: usize, |
||||
|
||||
/// The network configuration used to generate this ring
|
||||
pub layout: ClusterLayout, |
||||
|
||||
// Internal order of nodes used to make a more compact representation of the ring
|
||||
nodes: Vec<Uuid>, |
||||
|
||||
// The list of entries in the ring
|
||||
ring: Vec<RingEntry>, |
||||
} |
||||
|
||||
// Type to store compactly the id of a node in the system
|
||||
// Change this to u16 the day we want to have more than 256 nodes in a cluster
|
||||
pub type CompactNodeType = u8; |
||||
pub const MAX_NODE_NUMBER: usize = 256; |
||||
|
||||
// The maximum number of times an object might get replicated
|
||||
// This must be at least 3 because Garage supports 3-way replication
|
||||
// Here we use 6 so that the size of a ring entry is 8 bytes
|
||||
// (2 bytes partition id, 6 bytes node numbers as u8s)
|
||||
const MAX_REPLICATION: usize = 6; |
||||
|
||||
/// An entry in the ring
|
||||
#[derive(Clone, Debug)] |
||||
struct RingEntry { |
||||
// The two first bytes of the first hash that goes in this partition
|
||||
// (the next bytes are zeroes)
|
||||
hash_prefix: u16, |
||||
// The nodes that store this partition, stored as a list of positions in the `nodes`
|
||||
// field of the Ring structure
|
||||
// Only items 0 up to ring.replication_factor - 1 are used, others are zeros
|
||||
nodes_buf: [CompactNodeType; MAX_REPLICATION], |
||||
} |
||||
|
||||
impl Ring { |
||||
pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self { |
||||
if replication_factor != layout.replication_factor { |
||||
warn!("Could not build ring: replication factor does not match between local configuration and network role assignment."); |
||||
return Self::empty(layout, replication_factor); |
||||
} |
||||
|
||||
if layout.ring_assignment_data.len() != replication_factor * (1 << PARTITION_BITS) { |
||||
warn!("Could not build ring: network role assignment data has invalid length"); |
||||
return Self::empty(layout, replication_factor); |
||||
} |
||||
|
||||
let nodes = layout.node_id_vec.clone(); |
||||
let ring = (0..(1 << PARTITION_BITS)) |
||||
.map(|i| { |
||||
let top = (i as u16) << (16 - PARTITION_BITS); |
||||
let mut nodes_buf = [0u8; MAX_REPLICATION]; |
||||
nodes_buf[..replication_factor].copy_from_slice( |
||||
&layout.ring_assignment_data |
||||
[replication_factor * i..replication_factor * (i + 1)], |
||||
); |
||||
RingEntry { |
||||
hash_prefix: top, |
||||
nodes_buf, |
||||
} |
||||
}) |
||||
.collect::<Vec<_>>(); |
||||
|
||||
Self { |
||||
replication_factor, |
||||
layout, |
||||
nodes, |
||||
ring, |
||||
} |
||||
} |
||||
|
||||
fn empty(layout: ClusterLayout, replication_factor: usize) -> Self { |
||||
Self { |
||||
replication_factor, |
||||
layout, |
||||
nodes: vec![], |
||||
ring: vec![], |
||||
} |
||||
} |
||||
|
||||
/// Get the partition in which data would fall on
|
||||
pub fn partition_of(&self, position: &Hash) -> Partition { |
||||
let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); |
||||
top >> (16 - PARTITION_BITS) |
||||
} |
||||
|
||||
/// Get the list of partitions and the first hash of a partition key that would fall in it
|
||||
pub fn partitions(&self) -> Vec<(Partition, Hash)> { |
||||
let mut ret = vec![]; |
||||
|
||||
for (i, entry) in self.ring.iter().enumerate() { |
||||
let mut location = [0u8; 32]; |
||||
location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]); |
||||
ret.push((i as u16, location.into())); |
||||
} |
||||
if !ret.is_empty() { |
||||
assert_eq!(ret[0].1, [0u8; 32].into()); |
||||
} |
||||
|
||||
ret |
||||
} |
||||
|
||||
/// Walk the ring to find the n servers in which data should be replicated
|
||||
pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec<Uuid> { |
||||
if self.ring.len() != 1 << PARTITION_BITS { |
||||
warn!("Ring not yet ready, read/writes will be lost!"); |
||||
return vec![]; |
||||
} |
||||
|
||||
let partition_idx = self.partition_of(position) as usize; |
||||
let partition = &self.ring[partition_idx]; |
||||
|
||||
let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); |
||||
// Check that we haven't messed up our partition table, i.e. that this partition
|
||||
// table entrey indeed corresponds to the item we are storing
|
||||
assert_eq!( |
||||
partition.hash_prefix & PARTITION_MASK_U16, |
||||
top & PARTITION_MASK_U16 |
||||
); |
||||
|
||||
assert!(n <= self.replication_factor); |
||||
partition.nodes_buf[..n] |
||||
.iter() |
||||
.map(|i| self.nodes[*i as usize]) |
||||
.collect::<Vec<_>>() |
||||
} |
||||
} |
||||
|
||||
#[cfg(test)] |
||||
mod tests { |
||||
use super::*; |
||||
|
||||
#[test] |
||||
fn test_ring_entry_size() { |
||||
assert_eq!(std::mem::size_of::<RingEntry>(), 8); |
||||
} |
||||
} |
@ -1,25 +1,44 @@ |
||||
use garage_rpc::ring::*; |
||||
use garage_rpc::layout::*; |
||||
use garage_util::data::*; |
||||
|
||||
/// Trait to describe how a table shall be replicated
|
||||
pub trait TableReplication: Send + Sync + 'static { |
||||
type WriteSets: AsRef<Vec<Vec<Uuid>>> + AsMut<Vec<Vec<Uuid>>> + Send + Sync + 'static; |
||||
|
||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||
// To understand various replication methods
|
||||
|
||||
/// The entire list of all nodes that store a partition
|
||||
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>; |
||||
|
||||
/// Which nodes to send read requests to
|
||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>; |
||||
/// Responses needed to consider a read succesfull
|
||||
fn read_quorum(&self) -> usize; |
||||
|
||||
/// Which nodes to send writes to
|
||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>; |
||||
/// Responses needed to consider a write succesfull
|
||||
fn write_sets(&self, hash: &Hash) -> Self::WriteSets; |
||||
/// Responses needed to consider a write succesfull in each set
|
||||
fn write_quorum(&self) -> usize; |
||||
fn max_write_errors(&self) -> usize; |
||||
|
||||
// Accessing partitions, for Merkle tree & sync
|
||||
/// Get partition for data with given hash
|
||||
fn partition_of(&self, hash: &Hash) -> Partition; |
||||
/// List of existing partitions
|
||||
fn partitions(&self) -> Vec<(Partition, Hash)>; |
||||
/// List of partitions and nodes to sync with in current layout
|
||||
fn sync_partitions(&self) -> SyncPartitions; |
||||
} |
||||
|
||||
#[derive(Debug)] |
||||
pub struct SyncPartitions { |
||||
pub layout_version: u64, |
||||
pub partitions: Vec<SyncPartition>, |
||||
} |
||||
|
||||
#[derive(Debug)] |
||||
pub struct SyncPartition { |
||||
pub partition: Partition, |
||||
pub first_hash: Hash, |
||||
pub last_hash: Hash, |
||||
pub storage_sets: Vec<Vec<Uuid>>, |
||||
} |
||||
|
Loading…
Reference in new issue