WIP: garage_rpc: store layout version history

This commit is contained in:
Alex Auvolat 2023-11-08 17:49:06 +01:00
parent 4a9c94514f
commit fe9af1dcaa
No known key found for this signature in database
GPG Key ID: 0E496D15096376BE
7 changed files with 550 additions and 345 deletions

170
src/rpc/layout/history.rs Normal file
View File

@ -0,0 +1,170 @@
use std::cmp::Ordering;
use std::sync::Arc;
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
use super::schema::*;
use super::*;
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
let staging_parameters = Lww::<LayoutParameters>::new(version.parameters);
let empty_lwwmap = LwwMap::new();
let mut ret = LayoutHistory {
versions: vec![version].into_boxed_slice().into(),
update_trackers: Default::default(),
staging_parameters,
staging_roles: empty_lwwmap,
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret
}
pub fn current(&self) -> &LayoutVersion {
self.versions.last().as_ref().unwrap()
}
pub(crate) fn calculate_staging_hash(&self) -> Hash {
let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
}
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
let mut changed = false;
// Merge staged layout changes
match other.current().version.cmp(&self.current().version) {
Ordering::Greater => {
self.staging_parameters = other.staging_parameters.clone();
self.staging_roles = other.staging_roles.clone();
self.staging_hash = other.staging_hash;
changed = true;
}
Ordering::Equal => {
self.staging_parameters.merge(&other.staging_parameters);
self.staging_roles.merge(&other.staging_roles);
let new_staging_hash = self.calculate_staging_hash();
if new_staging_hash != self.staging_hash {
changed = true;
}
self.staging_hash = new_staging_hash;
}
Ordering::Less => (),
}
// Add any new versions to history
let mut versions = self.versions.to_vec();
for v2 in other.versions.iter() {
if let Some(v1) = versions.iter().find(|v| v.version == v2.version) {
if v1 != v2 {
error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
}
} else if versions.iter().all(|v| v.version != v2.version - 1) {
error!(
"Cannot receive new layout version {}, version {} is missing",
v2.version,
v2.version - 1
);
} else {
versions.push(v2.clone());
changed = true;
}
}
self.versions = Arc::from(versions.into_boxed_slice());
// Merge trackers
self.update_trackers.merge(&other.update_trackers);
changed
}
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.current().version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
let mut new_version = self.current().clone();
new_version.version += 1;
new_version.roles.merge(&self.staging_roles);
new_version.roles.retain(|(_, _, v)| v.0.is_some());
new_version.parameters = *self.staging_parameters.get();
self.staging_roles.clear();
self.staging_hash = self.calculate_staging_hash();
let msg = new_version.calculate_partition_assignment()?;
let mut versions = self.versions.to_vec();
versions.push(new_version);
self.versions = Arc::from(versions.into_boxed_slice());
Ok((self, msg))
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.current().version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging_roles.clear();
self.staging_parameters.update(self.current().parameters);
self.staging_hash = self.calculate_staging_hash();
// TODO this is stupid, we should have a separate version counter/LWW
// for the staging params
let mut new_version = self.current().clone();
new_version.version += 1;
let mut versions = self.versions.to_vec();
versions.push(new_version);
self.versions = Arc::from(versions.into_boxed_slice());
Ok(self)
}
pub fn check(&self) -> Result<(), String> {
// Check that the hash of the staging data is correct
let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
return Err("staging_hash is incorrect".into());
}
// TODO: anythign more ?
self.current().check()
}
}

32
src/rpc/layout/mod.rs Normal file
View File

@ -0,0 +1,32 @@
mod history;
mod schema;
mod tracker;
mod version;
pub use history::*;
pub use schema::*;
pub use version::*;
// ---- defines: partitions ----
/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
// ---- defines: nodes ----
// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
pub type CompactNodeType = u8;
pub const MAX_NODE_NUMBER: usize = 256;

286
src/rpc/layout/schema.rs Normal file
View File

@ -0,0 +1,286 @@
mod v08 {
use crate::layout::CompactNodeType;
use garage_util::crdt::LwwMap;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
pub roles: LwwMap<Uuid, NodeRoleV>,
/// node_id_vec: a vector of node IDs with a role assigned
/// in the system (this includes gateway nodes).
/// The order here is different than the vec stored by `roles`, because:
/// 1. non-gateway nodes are first so that they have lower numbers
/// 2. nodes that don't have a role are excluded (but they need to
/// stay in the CRDT as tombstones)
pub node_id_vec: Vec<Uuid>,
/// the assignation of data partitions to node, the values
/// are indices in node_id_vec
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
/// Role changes which are staged for the next version of the layout
pub staging: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
/// The user-assigned roles of cluster nodes
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
/// Datacenter at which this entry belong. This information is used to
/// perform a better geodistribution
pub zone: String,
/// The capacity of the node
/// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes
pub capacity: Option<u64>,
/// A set of tags to recognize the node
pub tags: Vec<String>,
}
impl garage_util::migrate::InitialFormat for ClusterLayout {}
}
mod v09 {
use super::v08;
use crate::layout::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
pub use v08::{NodeRole, NodeRoleV};
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
/// This attribute is only used to retain the previously computed partition size,
/// to know to what extent does it change with the layout update.
pub partition_size: u64,
/// Parameters used to compute the assignment currently given by
/// ring_assignment_data
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
/// see comment in v08::ClusterLayout
pub node_id_vec: Vec<Uuid>,
/// see comment in v08::ClusterLayout
#[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>,
/// Parameters to be used in the next partition assignment computation.
pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
/// This struct is used to set the parameters to be used in the assignment computation
/// algorithm. It is stored as a Crdt.
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct LayoutParameters {
pub zone_redundancy: ZoneRedundancy,
}
/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
/// of each partition on at least that number of different zones.
/// Otherwise, copies will be stored on the maximum possible number of zones.
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ZoneRedundancy {
AtLeast(usize),
Maximum,
}
impl garage_util::migrate::Migrate for ClusterLayout {
const VERSION_MARKER: &'static [u8] = b"G09layout";
type Previous = v08::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self {
use itertools::Itertools;
// In the old layout, capacities are in an arbitrary unit,
// but in the new layout they are in bytes.
// Here we arbitrarily multiply everything by 1G,
// such that 1 old capacity unit = 1GB in the new units.
// This is totally arbitrary and won't work for most users.
let cap_mul = 1024 * 1024 * 1024;
let roles = multiply_all_capacities(previous.roles, cap_mul);
let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
let node_id_vec = previous.node_id_vec;
// Determine partition size
let mut tmp = previous.ring_assignation_data.clone();
tmp.sort();
let partition_size = tmp
.into_iter()
.dedup_with_count()
.map(|(npart, node)| {
roles
.get(&node_id_vec[node as usize])
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
.unwrap_or(0) / npart as u64
})
.min()
.unwrap_or(0);
// By default, zone_redundancy is maximum possible value
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
Self {
version: previous.version,
replication_factor: previous.replication_factor,
partition_size,
parameters,
roles,
node_id_vec,
ring_assignment_data: previous.ring_assignation_data,
staging_parameters: Lww::new(parameters),
staging_roles,
staging_hash: [0u8; 32].into(), // will be set in the next migration
}
}
}
fn multiply_all_capacities(
old_roles: LwwMap<Uuid, NodeRoleV>,
mul: u64,
) -> LwwMap<Uuid, NodeRoleV> {
let mut new_roles = LwwMap::new();
for (node, ts, role) in old_roles.items() {
let mut role = role.clone();
if let NodeRoleV(Some(NodeRole {
capacity: Some(ref mut cap),
..
})) = role
{
*cap *= mul;
}
new_roles.merge_raw(node, *ts, &role);
}
new_roles
}
}
mod v010 {
use super::v09;
use crate::layout::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LayoutVersion {
pub version: u64,
pub replication_factor: usize,
/// This attribute is only used to retain the previously computed partition size,
/// to know to what extent does it change with the layout update.
pub partition_size: u64,
/// Parameters used to compute the assignment currently given by
/// ring_assignment_data
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
/// see comment in v08::ClusterLayout
pub node_id_vec: Vec<Uuid>,
/// see comment in v08::ClusterLayout
#[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>,
}
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LayoutHistory {
/// The versions currently in use in the cluster
pub versions: Arc<[LayoutVersion]>,
/// Update trackers
pub update_trackers: UpdateTrackers,
/// Parameters to be used in the next partition assignment computation.
pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
/// Hash of the serialized staging_parameters + staging_roles
pub staging_hash: Hash,
}
/// The tracker of acknowlegments and data syncs around the cluster
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct UpdateTrackers {
/// The highest layout version number each node has ack'ed
pub ack_map: UpdateTracker,
/// The highest layout version number each node has synced data for
pub sync_map: UpdateTracker,
/// The highest layout version number each node has
/// ack'ed that all other nodes have synced data for
pub sync_ack_map: UpdateTracker,
}
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct UpdateTracker(pub HashMap<Uuid, u64>);
impl garage_util::migrate::Migrate for LayoutHistory {
const VERSION_MARKER: &'static [u8] = b"G010lh";
type Previous = v09::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self {
let version = LayoutVersion {
version: previous.version,
replication_factor: previous.replication_factor,
partition_size: previous.partition_size,
parameters: previous.parameters,
roles: previous.roles,
node_id_vec: previous.node_id_vec,
ring_assignment_data: previous.ring_assignment_data,
};
let update_tracker = UpdateTracker(
version
.nongateway_nodes()
.iter()
.map(|x| (*x, version.version))
.collect::<HashMap<Uuid, u64>>(),
);
let mut ret = Self {
versions: Arc::from(vec![version].into_boxed_slice()),
update_trackers: UpdateTrackers {
ack_map: update_tracker.clone(),
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
staging_parameters: previous.staging_parameters,
staging_roles: previous.staging_roles,
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret
}
}
}
pub use v010::*;

21
src/rpc/layout/tracker.rs Normal file
View File

@ -0,0 +1,21 @@
use super::*;
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) {
for (k, v) in other.0.iter() {
if let Some(v_mut) = self.0.get_mut(k) {
*v_mut = std::cmp::max(*v_mut, *v);
} else {
self.0.insert(*k, *v);
}
}
}
}
impl UpdateTrackers {
pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
self.ack_map.merge(&other.ack_map);
self.sync_map.merge(&other.sync_map);
self.sync_ack_map.merge(&other.sync_ack_map);
}
}

View File

@ -1,4 +1,3 @@
use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
@ -6,227 +5,20 @@ use std::fmt;
use bytesize::ByteSize; use bytesize::ByteSize;
use itertools::Itertools; use itertools::Itertools;
use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::crdt::{AutoCrdt, LwwMap};
use garage_util::data::*; use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
use garage_util::error::*; use garage_util::error::*;
use crate::graph_algo::*; use crate::graph_algo::*;
use std::convert::TryInto; use std::convert::TryInto;
// ---- defines: partitions ---- use super::schema::*;
use super::*;
/// A partition id, which is stored on 16 bits
/// i.e. we have up to 2**16 partitions.
/// (in practice we have exactly 2**PARTITION_BITS partitions)
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
// ---- defines: nodes ----
// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
pub type CompactNodeType = u8;
pub const MAX_NODE_NUMBER: usize = 256;
// ---- defines: other ----
// The Message type will be used to collect information on the algorithm. // The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>; pub type Message = Vec<String>;
mod v08 {
use super::CompactNodeType;
use garage_util::crdt::LwwMap;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
pub roles: LwwMap<Uuid, NodeRoleV>,
/// node_id_vec: a vector of node IDs with a role assigned
/// in the system (this includes gateway nodes).
/// The order here is different than the vec stored by `roles`, because:
/// 1. non-gateway nodes are first so that they have lower numbers
/// 2. nodes that don't have a role are excluded (but they need to
/// stay in the CRDT as tombstones)
pub node_id_vec: Vec<Uuid>,
/// the assignation of data partitions to node, the values
/// are indices in node_id_vec
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
/// Role changes which are staged for the next version of the layout
pub staging: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
/// The user-assigned roles of cluster nodes
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
/// Datacenter at which this entry belong. This information is used to
/// perform a better geodistribution
pub zone: String,
/// The capacity of the node
/// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes
pub capacity: Option<u64>,
/// A set of tags to recognize the node
pub tags: Vec<String>,
}
impl garage_util::migrate::InitialFormat for ClusterLayout {}
}
mod v09 {
use super::v08;
use super::CompactNodeType;
use garage_util::crdt::{Lww, LwwMap};
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
pub use v08::{NodeRole, NodeRoleV};
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
/// This attribute is only used to retain the previously computed partition size,
/// to know to what extent does it change with the layout update.
pub partition_size: u64,
/// Parameters used to compute the assignment currently given by
/// ring_assignment_data
pub parameters: LayoutParameters,
pub roles: LwwMap<Uuid, NodeRoleV>,
/// see comment in v08::ClusterLayout
pub node_id_vec: Vec<Uuid>,
/// see comment in v08::ClusterLayout
#[serde(with = "serde_bytes")]
pub ring_assignment_data: Vec<CompactNodeType>,
/// Parameters to be used in the next partition assignment computation.
pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
/// This struct is used to set the parameters to be used in the assignment computation
/// algorithm. It is stored as a Crdt.
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct LayoutParameters {
pub zone_redundancy: ZoneRedundancy,
}
/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
/// of each partition on at least that number of different zones.
/// Otherwise, copies will be stored on the maximum possible number of zones.
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ZoneRedundancy {
AtLeast(usize),
Maximum,
}
impl garage_util::migrate::Migrate for ClusterLayout {
const VERSION_MARKER: &'static [u8] = b"G09layout";
type Previous = v08::ClusterLayout;
fn migrate(previous: Self::Previous) -> Self {
use itertools::Itertools;
// In the old layout, capacities are in an arbitrary unit,
// but in the new layout they are in bytes.
// Here we arbitrarily multiply everything by 1G,
// such that 1 old capacity unit = 1GB in the new units.
// This is totally arbitrary and won't work for most users.
let cap_mul = 1024 * 1024 * 1024;
let roles = multiply_all_capacities(previous.roles, cap_mul);
let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
let node_id_vec = previous.node_id_vec;
// Determine partition size
let mut tmp = previous.ring_assignation_data.clone();
tmp.sort();
let partition_size = tmp
.into_iter()
.dedup_with_count()
.map(|(npart, node)| {
roles
.get(&node_id_vec[node as usize])
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
.unwrap_or(0) / npart as u64
})
.min()
.unwrap_or(0);
// By default, zone_redundancy is maximum possible value
let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum,
};
let mut res = Self {
version: previous.version,
replication_factor: previous.replication_factor,
partition_size,
parameters,
roles,
node_id_vec,
ring_assignment_data: previous.ring_assignation_data,
staging_parameters: Lww::new(parameters),
staging_roles,
staging_hash: [0u8; 32].into(),
};
res.staging_hash = res.calculate_staging_hash();
res
}
}
fn multiply_all_capacities(
old_roles: LwwMap<Uuid, NodeRoleV>,
mul: u64,
) -> LwwMap<Uuid, NodeRoleV> {
let mut new_roles = LwwMap::new();
for (node, ts, role) in old_roles.items() {
let mut role = role.clone();
if let NodeRoleV(Some(NodeRole {
capacity: Some(ref mut cap),
..
})) = role
{
*cap *= mul;
}
new_roles.merge_raw(node, *ts, &role);
}
new_roles
}
}
pub use v09::*;
impl AutoCrdt for LayoutParameters { impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
@ -272,19 +64,15 @@ impl core::str::FromStr for ZoneRedundancy {
} }
} }
// Implementation of the ClusterLayout methods unrelated to the assignment algorithm. impl LayoutVersion {
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self { pub fn new(replication_factor: usize) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum // We set the default zone redundancy to be Maximum, meaning that the maximum
// possible value will be used depending on the cluster topology // possible value will be used depending on the cluster topology
let parameters = LayoutParameters { let parameters = LayoutParameters {
zone_redundancy: ZoneRedundancy::Maximum, zone_redundancy: ZoneRedundancy::Maximum,
}; };
let staging_parameters = Lww::<LayoutParameters>::new(parameters);
let empty_lwwmap = LwwMap::new(); LayoutVersion {
let mut ret = ClusterLayout {
version: 0, version: 0,
replication_factor, replication_factor,
partition_size: 0, partition_size: 0,
@ -292,12 +80,7 @@ impl ClusterLayout {
node_id_vec: Vec::new(), node_id_vec: Vec::new(),
ring_assignment_data: Vec::new(), ring_assignment_data: Vec::new(),
parameters, parameters,
staging_parameters, }
staging_roles: empty_lwwmap,
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
ret
} }
// ===================== accessors ====================== // ===================== accessors ======================
@ -399,7 +182,7 @@ impl ClusterLayout {
// ===================== internal information extractors ====================== // ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec. /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
fn nongateway_nodes(&self) -> Vec<Uuid> { pub(crate) fn nongateway_nodes(&self) -> Vec<Uuid> {
let mut result = Vec::<Uuid>::new(); let mut result = Vec::<Uuid>::new();
for uuid in self.node_id_vec.iter() { for uuid in self.node_id_vec.iter() {
match self.node_role(uuid) { match self.node_role(uuid) {
@ -446,99 +229,10 @@ impl ClusterLayout {
} }
} }
fn calculate_staging_hash(&self) -> Hash {
let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
}
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
match other.version.cmp(&self.version) {
Ordering::Greater => {
*self = other.clone();
true
}
Ordering::Equal => {
self.staging_parameters.merge(&other.staging_parameters);
self.staging_roles.merge(&other.staging_roles);
let new_staging_hash = self.calculate_staging_hash();
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
changed
}
Ordering::Less => false,
}
}
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.roles.merge(&self.staging_roles);
self.roles.retain(|(_, _, v)| v.0.is_some());
self.parameters = *self.staging_parameters.get();
self.staging_roles.clear();
self.staging_hash = self.calculate_staging_hash();
let msg = self.calculate_partition_assignment()?;
self.version += 1;
Ok((self, msg))
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging_roles.clear();
self.staging_parameters.update(self.parameters);
self.staging_hash = self.calculate_staging_hash();
self.version += 1;
Ok(self)
}
/// Check a cluster layout for internal consistency /// Check a cluster layout for internal consistency
/// (assignment, roles, parameters, partition size) /// (assignment, roles, parameters, partition size)
/// returns true if consistent, false if error /// returns true if consistent, false if error
pub fn check(&self) -> Result<(), String> { pub fn check(&self) -> Result<(), String> {
// Check that the hash of the staging data is correct
let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
return Err("staging_hash is incorrect".into());
}
// Check that node_id_vec contains the correct list of nodes // Check that node_id_vec contains the correct list of nodes
let mut expected_nodes = self let mut expected_nodes = self
.roles .roles
@ -654,7 +348,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
/// data to be moved. /// data to be moved.
/// Staged role changes must be merged with nodes roles before calling this function, /// Staged role changes must be merged with nodes roles before calling this function,
/// hence it must only be called from apply_staged_changes() and hence is not public. /// hence it must only be called from apply_staged_changes() and hence is not public.
fn calculate_partition_assignment(&mut self) -> Result<Message, Error> { pub(crate) fn calculate_partition_assignment(&mut self) -> Result<Message, Error> {
// We update the node ids, since the node role list might have changed with the // We update the node ids, since the node role list might have changed with the
// changes in the layout. We retrieve the old_assignment reframed with new ids // changes in the layout. We retrieve the old_assignment reframed with new ids
let old_assignment_opt = self.update_node_id_vec()?; let old_assignment_opt = self.update_node_id_vec()?;
@ -911,7 +605,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
zone_redundancy: usize, zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> { ) -> Result<Graph<FlowEdge>, Error> {
let vertices = let vertices =
ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len()); LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
let mut g = Graph::<FlowEdge>::new(&vertices); let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len(); let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS { for p in 0..NB_PARTITIONS {
@ -1214,7 +908,7 @@ mod tests {
// number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2) // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
// With these parameters, the naive algo fails, whereas there is a solution: // With these parameters, the naive algo fails, whereas there is a solution:
// (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E) // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> { fn check_against_naive(cl: &LayoutVersion) -> Result<bool, Error> {
let over_size = cl.partition_size + 1; let over_size = cl.partition_size + 1;
let mut zone_token = HashMap::<String, usize>::new(); let mut zone_token = HashMap::<String, usize>::new();
@ -1280,7 +974,7 @@ mod tests {
} }
fn update_layout( fn update_layout(
cl: &mut ClusterLayout, cl: &mut LayoutVersion,
node_id_vec: &Vec<u8>, node_id_vec: &Vec<u8>,
node_capacity_vec: &Vec<u64>, node_capacity_vec: &Vec<u64>,
node_zone_vec: &Vec<String>, node_zone_vec: &Vec<String>,
@ -1316,7 +1010,7 @@ mod tests {
.map(|x| x.to_string()) .map(|x| x.to_string())
.collect(); .collect();
let mut cl = ClusterLayout::new(3); let mut cl = LayoutVersion::new(3);
update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3); update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.version; let v = cl.version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();

View File

@ -26,7 +26,7 @@ use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use crate::layout::ClusterLayout; use crate::layout::LayoutHistory;
use crate::metrics::RpcMetrics; use crate::metrics::RpcMetrics;
// Default RPC timeout = 5 minutes // Default RPC timeout = 5 minutes
@ -91,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner { struct RpcHelperInner {
our_node_id: Uuid, our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
layout_watch: watch::Receiver<Arc<ClusterLayout>>, layout_watch: watch::Receiver<Arc<LayoutHistory>>,
metrics: RpcMetrics, metrics: RpcMetrics,
rpc_timeout: Duration, rpc_timeout: Duration,
} }
@ -100,7 +100,7 @@ impl RpcHelper {
pub(crate) fn new( pub(crate) fn new(
our_node_id: Uuid, our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
layout_watch: watch::Receiver<Arc<ClusterLayout>>, layout_watch: watch::Receiver<Arc<LayoutHistory>>,
rpc_timeout: Option<Duration>, rpc_timeout: Option<Duration>,
) -> Self { ) -> Self {
let metrics = RpcMetrics::new(); let metrics = RpcMetrics::new();
@ -392,8 +392,8 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests // Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list(); let peer_list = self.0.fullmesh.get_peer_list();
let layout: Arc<ClusterLayout> = self.0.layout_watch.borrow().clone(); let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone();
let our_zone = match layout.node_role(&self.0.our_node_id) { let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone, Some(pc) => &pc.zone,
None => "", None => "",
}; };
@ -407,7 +407,7 @@ impl RpcHelper {
let mut nodes = nodes let mut nodes = nodes
.iter() .iter()
.map(|to| { .map(|to| {
let peer_zone = match layout.node_role(to) { let peer_zone = match layout.current().node_role(to) {
Some(pc) => &pc.zone, Some(pc) => &pc.zone,
None => "", None => "",
}; };

View File

@ -64,7 +64,7 @@ pub enum SystemRpc {
/// Exchanged with every node on a regular basis. /// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus), AdvertiseStatus(NodeStatus),
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
AdvertiseClusterLayout(ClusterLayout), AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states /// Get known nodes states
GetKnownNodes, GetKnownNodes,
/// Return known nodes /// Return known nodes
@ -84,7 +84,7 @@ pub struct System {
/// The id of this node /// The id of this node
pub id: Uuid, pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>, persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>, persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>, local_status: ArcSwap<NodeStatus>,
@ -112,8 +112,8 @@ pub struct System {
replication_factor: usize, replication_factor: usize,
/// The layout /// The layout
pub layout_watch: watch::Receiver<Arc<ClusterLayout>>, pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>, update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
/// Path to metadata directory /// Path to metadata directory
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
@ -256,16 +256,16 @@ impl System {
hex::encode(&node_key.public_key()[..8]) hex::encode(&node_key.public_key()[..8])
); );
let persist_cluster_layout: Persister<ClusterLayout> = let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout"); Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() { let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => { Ok(x) => {
if x.replication_factor != replication_factor { if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!( return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.replication_factor, x.current().replication_factor,
replication_factor replication_factor
))); )));
} }
@ -276,7 +276,7 @@ impl System {
"No valid previous cluster layout stored ({}), starting fresh.", "No valid previous cluster layout stored ({}), starting fresh.",
e e
); );
ClusterLayout::new(replication_factor) LayoutHistory::new(replication_factor)
} }
}; };
@ -423,13 +423,13 @@ impl System {
known_nodes known_nodes
} }
pub fn cluster_layout(&self) -> watch::Ref<Arc<ClusterLayout>> { pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow() self.layout_watch.borrow()
} }
pub async fn update_cluster_layout( pub async fn update_cluster_layout(
self: &Arc<Self>, self: &Arc<Self>,
layout: &ClusterLayout, layout: &LayoutHistory,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?; self.handle_advertise_cluster_layout(layout).await?;
Ok(()) Ok(())
@ -475,7 +475,9 @@ impl System {
.collect::<HashMap<Uuid, _>>(); .collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
// TODO: not only layout.current()
let storage_nodes = layout let storage_nodes = layout
.current()
.roles .roles
.items() .items()
.iter() .iter()
@ -486,11 +488,11 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count(); .count();
let partitions = layout.partitions(); let partitions = layout.current().partitions();
let partitions_n_up = partitions let partitions_n_up = partitions
.iter() .iter()
.map(|(_, h)| { .map(|(_, h)| {
let pn = layout.nodes_of(h, layout.replication_factor); let pn = layout.current().nodes_of(h, replication_factor);
pn.iter() pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count() .count()
@ -581,7 +583,7 @@ impl System {
/// Save network configuration to disc /// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> { async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone(); let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
self.persist_cluster_layout self.persist_cluster_layout
.save_async(&layout) .save_async(&layout)
.await .await
@ -593,7 +595,7 @@ impl System {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let layout = self.layout_watch.borrow(); let layout = self.layout_watch.borrow();
new_si.cluster_layout_version = layout.version; new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@ -648,12 +650,12 @@ impl System {
async fn handle_advertise_cluster_layout( async fn handle_advertise_cluster_layout(
self: &Arc<Self>, self: &Arc<Self>,
adv: &ClusterLayout, adv: &LayoutHistory,
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
if adv.replication_factor != self.replication_factor { if adv.current().replication_factor != self.replication_factor {
let msg = format!( let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.replication_factor, adv.current().replication_factor,
self.replication_factor self.replication_factor
); );
error!("{}", msg); error!("{}", msg);
@ -662,7 +664,7 @@ impl System {
let update_layout = self.update_layout.lock().await; let update_layout = self.update_layout.lock().await;
// TODO: don't clone each time an AdvertiseClusterLayout is received // TODO: don't clone each time an AdvertiseClusterLayout is received
let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone(); let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok(); let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) { if layout.merge(adv) {
@ -724,7 +726,7 @@ impl System {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = self.layout_watch.borrow().check().is_err(); let not_configured = self.layout_watch.borrow().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.layout_watch.borrow().num_nodes(); let expected_n_nodes = self.layout_watch.borrow().current().num_nodes();
let bad_peers = self let bad_peers = self
.fullmesh .fullmesh
.get_peer_list() .get_peer_list()
@ -863,13 +865,13 @@ impl EndpointHandler<SystemRpc> for System {
} }
impl NodeStatus { impl NodeStatus {
fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self { fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self {
NodeStatus { NodeStatus {
hostname: gethostname::gethostname() hostname: gethostname::gethostname()
.into_string() .into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()), .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor, replication_factor,
cluster_layout_version: layout.version, cluster_layout_version: layout.current().version,
cluster_layout_staging_hash: layout.staging_hash, cluster_layout_staging_hash: layout.staging_hash,
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,