modifications in several files to :
- have consistent error return types - store the zone redundancy in a Lww - print the error and message in the CLI (TODO: for the server Api, should msg be returned in the body response?)
This commit is contained in:
parent
829f815a89
commit
ceac3713d6
@ -162,7 +162,12 @@ pub async fn handle_apply_cluster_layout(
|
|||||||
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let layout = garage.system.get_cluster_layout();
|
let layout = garage.system.get_cluster_layout();
|
||||||
let layout = layout.apply_staged_changes(Some(param.version))?;
|
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
||||||
|
//TODO : how to display msg ? Should it be in the Body Response ?
|
||||||
|
for s in msg.iter() {
|
||||||
|
println!("{}", s);
|
||||||
|
}
|
||||||
|
|
||||||
garage.system.update_cluster_layout(&layout).await?;
|
garage.system.update_cluster_layout(&layout).await?;
|
||||||
|
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
|
@ -188,19 +188,23 @@ pub async fn cmd_show_layout(
|
|||||||
|
|
||||||
// this will print the stats of what partitions
|
// this will print the stats of what partitions
|
||||||
// will move around when we apply
|
// will move around when we apply
|
||||||
if layout.calculate_partition_assignation() {
|
match layout.calculate_partition_assignation() {
|
||||||
println!("To enact the staged role changes, type:");
|
Ok(msg) => {
|
||||||
println!();
|
for line in msg.iter() {
|
||||||
println!(" garage layout apply --version {}", layout.version + 1);
|
println!("{}", line);
|
||||||
println!();
|
}
|
||||||
println!(
|
println!("To enact the staged role changes, type:");
|
||||||
"You can also revert all proposed changes with: garage layout revert --version {}",
|
println!();
|
||||||
layout.version + 1
|
println!(" garage layout apply --version {}", layout.version + 1);
|
||||||
);
|
println!();
|
||||||
} else {
|
println!(
|
||||||
println!("Not enough nodes have an assigned role to maintain enough copies of data.");
|
"You can also revert all proposed changes with: garage layout revert --version {}",
|
||||||
println!("This new layout cannot yet be applied.");
|
layout.version + 1)},
|
||||||
}
|
Err(Error::Message(s)) => {
|
||||||
|
println!("Error while trying to compute the assignation: {}", s);
|
||||||
|
println!("This new layout cannot yet be applied.");},
|
||||||
|
_ => { println!("Unknown Error"); },
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -213,7 +217,10 @@ pub async fn cmd_apply_layout(
|
|||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
let layout = fetch_layout(rpc_cli, rpc_host).await?;
|
||||||
|
|
||||||
let layout = layout.apply_staged_changes(apply_opt.version)?;
|
let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
|
||||||
|
for line in msg.iter() {
|
||||||
|
println!("{}", line);
|
||||||
|
}
|
||||||
|
|
||||||
send_layout(rpc_cli, rpc_host, layout).await?;
|
send_layout(rpc_cli, rpc_host, layout).await?;
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ use itertools::Itertools;
|
|||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
|
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap, Lww};
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
@ -27,12 +27,10 @@ pub struct ClusterLayout {
|
|||||||
pub version: u64,
|
pub version: u64,
|
||||||
|
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
#[serde(default="default_one")]
|
|
||||||
pub zone_redundancy: usize,
|
|
||||||
|
|
||||||
//This attribute is only used to retain the previously computed partition size,
|
//This attribute is only used to retain the previously computed partition size,
|
||||||
//to know to what extent does it change with the layout update.
|
//to know to what extent does it change with the layout update.
|
||||||
#[serde(default="default_zero")]
|
#[serde(default="default_partition_size")]
|
||||||
pub partition_size: u32,
|
pub partition_size: u32,
|
||||||
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
@ -51,17 +49,31 @@ pub struct ClusterLayout {
|
|||||||
pub ring_assignation_data: Vec<CompactNodeType>,
|
pub ring_assignation_data: Vec<CompactNodeType>,
|
||||||
|
|
||||||
/// Role changes which are staged for the next version of the layout
|
/// Role changes which are staged for the next version of the layout
|
||||||
|
#[serde(default="default_layout_parameters")]
|
||||||
|
pub parameters: Lww<LayoutParameters>,
|
||||||
pub staging: LwwMap<Uuid, NodeRoleV>,
|
pub staging: LwwMap<Uuid, NodeRoleV>,
|
||||||
pub staging_hash: Hash,
|
pub staging_hash: Hash,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_one() -> usize{
|
fn default_partition_size() -> u32{
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
fn default_zero() -> u32{
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_layout_parameters() -> Lww<LayoutParameters>{
|
||||||
|
Lww::<LayoutParameters>::new(LayoutParameters{ zone_redundancy: 1})
|
||||||
|
}
|
||||||
|
|
||||||
|
///This struct is used to set the parameters to be used in the assignation computation
|
||||||
|
///algorithm. It is stored as a Crdt.
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct LayoutParameters {
|
||||||
|
pub zone_redundancy:usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AutoCrdt for LayoutParameters {
|
||||||
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
|
}
|
||||||
|
|
||||||
const NB_PARTITIONS : usize = 1usize << PARTITION_BITS;
|
const NB_PARTITIONS : usize = 1usize << PARTITION_BITS;
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
@ -108,18 +120,24 @@ impl NodeRole {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
pub fn new(replication_factor: usize, zone_redundancy: usize) -> Self {
|
pub fn new(replication_factor: usize) -> Self {
|
||||||
|
|
||||||
|
//We set the default zone redundancy to be equal to the replication factor,
|
||||||
|
//i.e. as strict as possible.
|
||||||
|
let default_parameters = Lww::<LayoutParameters>::new(
|
||||||
|
LayoutParameters{ zone_redundancy: replication_factor});
|
||||||
|
|
||||||
let empty_lwwmap = LwwMap::new();
|
let empty_lwwmap = LwwMap::new();
|
||||||
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
|
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
|
||||||
|
|
||||||
ClusterLayout {
|
ClusterLayout {
|
||||||
version: 0,
|
version: 0,
|
||||||
replication_factor,
|
replication_factor,
|
||||||
zone_redundancy,
|
|
||||||
partition_size: 0,
|
partition_size: 0,
|
||||||
roles: LwwMap::new(),
|
roles: LwwMap::new(),
|
||||||
node_id_vec: Vec::new(),
|
node_id_vec: Vec::new(),
|
||||||
ring_assignation_data: Vec::new(),
|
ring_assignation_data: Vec::new(),
|
||||||
|
parameters: default_parameters,
|
||||||
staging: empty_lwwmap,
|
staging: empty_lwwmap,
|
||||||
staging_hash: empty_lwwmap_hash,
|
staging_hash: empty_lwwmap_hash,
|
||||||
}
|
}
|
||||||
@ -132,6 +150,7 @@ impl ClusterLayout {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
|
self.parameters.merge(&other.parameters);
|
||||||
self.staging.merge(&other.staging);
|
self.staging.merge(&other.staging);
|
||||||
|
|
||||||
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
||||||
@ -145,7 +164,7 @@ impl ClusterLayout {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
|
pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self,Message), Error> {
|
||||||
match version {
|
match version {
|
||||||
None => {
|
None => {
|
||||||
let error = r#"
|
let error = r#"
|
||||||
@ -164,16 +183,14 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
|||||||
self.roles.merge(&self.staging);
|
self.roles.merge(&self.staging);
|
||||||
self.roles.retain(|(_, _, v)| v.0.is_some());
|
self.roles.retain(|(_, _, v)| v.0.is_some());
|
||||||
|
|
||||||
if !self.calculate_partition_assignation() {
|
let msg = self.calculate_partition_assignation()?;
|
||||||
return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.staging.clear();
|
self.staging.clear();
|
||||||
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
Ok(self)
|
Ok((self,msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
|
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
|
||||||
@ -231,24 +248,24 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
|||||||
}
|
}
|
||||||
|
|
||||||
///Given a node uuids, this function returns the label of its zone
|
///Given a node uuids, this function returns the label of its zone
|
||||||
pub fn get_node_zone(&self, uuid : &Uuid) -> Result<String,String> {
|
pub fn get_node_zone(&self, uuid : &Uuid) -> Result<String,Error> {
|
||||||
match self.node_role(uuid) {
|
match self.node_role(uuid) {
|
||||||
Some(role) => return Ok(role.zone.clone()),
|
Some(role) => return Ok(role.zone.clone()),
|
||||||
_ => return Err("The Uuid does not correspond to a node present in the cluster.".to_string())
|
_ => return Err(Error::Message("The Uuid does not correspond to a node present in the cluster.".into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Given a node uuids, this function returns its capacity or fails if it does not have any
|
///Given a node uuids, this function returns its capacity or fails if it does not have any
|
||||||
pub fn get_node_capacity(&self, uuid : &Uuid) -> Result<u32,String> {
|
pub fn get_node_capacity(&self, uuid : &Uuid) -> Result<u32,Error> {
|
||||||
match self.node_role(uuid) {
|
match self.node_role(uuid) {
|
||||||
Some(NodeRole{capacity : Some(cap), zone: _, tags: _}) => return Ok(*cap),
|
Some(NodeRole{capacity : Some(cap), zone: _, tags: _}) => return Ok(*cap),
|
||||||
_ => return Err("The Uuid does not correspond to a node present in the \
|
_ => return Err(Error::Message("The Uuid does not correspond to a node present in the \
|
||||||
cluster or this node does not have a positive capacity.".to_string())
|
cluster or this node does not have a positive capacity.".into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///Returns the sum of capacities of non gateway nodes in the cluster
|
///Returns the sum of capacities of non gateway nodes in the cluster
|
||||||
pub fn get_total_capacity(&self) -> Result<u32,String> {
|
pub fn get_total_capacity(&self) -> Result<u32,Error> {
|
||||||
let mut total_capacity = 0;
|
let mut total_capacity = 0;
|
||||||
for uuid in self.useful_nodes().iter() {
|
for uuid in self.useful_nodes().iter() {
|
||||||
total_capacity += self.get_node_capacity(uuid)?;
|
total_capacity += self.get_node_capacity(uuid)?;
|
||||||
@ -311,7 +328,8 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
|||||||
let zones_of_p = nodes_of_p.iter()
|
let zones_of_p = nodes_of_p.iter()
|
||||||
.map(|n| self.get_node_zone(&self.node_id_vec[*n as usize])
|
.map(|n| self.get_node_zone(&self.node_id_vec[*n as usize])
|
||||||
.expect("Zone not found."));
|
.expect("Zone not found."));
|
||||||
if zones_of_p.unique().count() < self.zone_redundancy {
|
let redundancy = self.parameters.get().zone_redundancy;
|
||||||
|
if zones_of_p.unique().count() < redundancy {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -354,7 +372,7 @@ impl ClusterLayout {
|
|||||||
/// Among such optimal assignation, it minimizes the distance to
|
/// Among such optimal assignation, it minimizes the distance to
|
||||||
/// the former assignation (if any) to minimize the amount of
|
/// the former assignation (if any) to minimize the amount of
|
||||||
/// data to be moved.
|
/// data to be moved.
|
||||||
pub fn calculate_partition_assignation(&mut self, replication:usize, redundancy:usize) -> Result<Message,String> {
|
pub fn calculate_partition_assignation(&mut self) -> Result<Message,Error> {
|
||||||
//The nodes might have been updated, some might have been deleted.
|
//The nodes might have been updated, some might have been deleted.
|
||||||
//So we need to first update the list of nodes and retrieve the
|
//So we need to first update the list of nodes and retrieve the
|
||||||
//assignation.
|
//assignation.
|
||||||
@ -362,12 +380,12 @@ impl ClusterLayout {
|
|||||||
//We update the node ids, since the node list might have changed with the staged
|
//We update the node ids, since the node list might have changed with the staged
|
||||||
//changes in the layout. We retrieve the old_assignation reframed with the new ids
|
//changes in the layout. We retrieve the old_assignation reframed with the new ids
|
||||||
let old_assignation_opt = self.update_node_id_vec()?;
|
let old_assignation_opt = self.update_node_id_vec()?;
|
||||||
self.replication_factor = replication;
|
|
||||||
self.zone_redundancy = redundancy;
|
let redundancy = self.parameters.get().zone_redundancy;
|
||||||
|
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
msg.push(format!("Computation of a new cluster layout where partitions are \
|
msg.push(format!("Computation of a new cluster layout where partitions are \
|
||||||
replicated {} times on at least {} distinct zones.", replication, redundancy));
|
replicated {} times on at least {} distinct zones.", self.replication_factor, redundancy));
|
||||||
|
|
||||||
//We generate for once numerical ids for the zone, to use them as indices in the
|
//We generate for once numerical ids for the zone, to use them as indices in the
|
||||||
//flow graphs.
|
//flow graphs.
|
||||||
@ -381,6 +399,7 @@ impl ClusterLayout {
|
|||||||
//In this case, integer rounding plays a marginal role in the percentages of
|
//In this case, integer rounding plays a marginal role in the percentages of
|
||||||
//optimality.
|
//optimality.
|
||||||
let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
|
let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
|
||||||
|
|
||||||
if old_assignation_opt != None {
|
if old_assignation_opt != None {
|
||||||
msg.push(format!("Given the replication and redundancy constraint, the \
|
msg.push(format!("Given the replication and redundancy constraint, the \
|
||||||
optimal size of a partition is {}. In the previous layout, it used to \
|
optimal size of a partition is {}. In the previous layout, it used to \
|
||||||
@ -392,6 +411,12 @@ impl ClusterLayout {
|
|||||||
}
|
}
|
||||||
self.partition_size = partition_size;
|
self.partition_size = partition_size;
|
||||||
|
|
||||||
|
if partition_size < 100 {
|
||||||
|
msg.push("WARNING: The partition size is low (< 100), you might consider to \
|
||||||
|
give the nodes capacities in a smaller unit (e.g. Mb instead of Gb) to \
|
||||||
|
achieve a more tailored use of your storage ressources.".into());
|
||||||
|
}
|
||||||
|
|
||||||
//We compute a first flow/assignment that is heuristically close to the previous
|
//We compute a first flow/assignment that is heuristically close to the previous
|
||||||
//assignment
|
//assignment
|
||||||
let mut gflow = self.compute_candidate_assignment( &zone_to_id, &old_assignation_opt)?;
|
let mut gflow = self.compute_candidate_assignment( &zone_to_id, &old_assignation_opt)?;
|
||||||
@ -413,7 +438,7 @@ impl ClusterLayout {
|
|||||||
/// None if the node is not present anymore.
|
/// None if the node is not present anymore.
|
||||||
/// We work with the assumption that only this function and calculate_new_assignation
|
/// We work with the assumption that only this function and calculate_new_assignation
|
||||||
/// do modify assignation_ring and node_id_vec.
|
/// do modify assignation_ring and node_id_vec.
|
||||||
fn update_node_id_vec(&mut self) -> Result< Option< Vec<Vec<usize> > > ,String> {
|
fn update_node_id_vec(&mut self) -> Result< Option< Vec<Vec<usize> > > ,Error> {
|
||||||
// (1) We compute the new node list
|
// (1) We compute the new node list
|
||||||
//Non gateway nodes should be coded on 8bits, hence they must be first in the list
|
//Non gateway nodes should be coded on 8bits, hence they must be first in the list
|
||||||
//We build the new node ids
|
//We build the new node ids
|
||||||
@ -423,8 +448,8 @@ impl ClusterLayout {
|
|||||||
.map(|(k, _, _)| *k).collect();
|
.map(|(k, _, _)| *k).collect();
|
||||||
|
|
||||||
if new_non_gateway_nodes.len() > MAX_NODE_NUMBER {
|
if new_non_gateway_nodes.len() > MAX_NODE_NUMBER {
|
||||||
return Err(format!("There are more than {} non-gateway nodes in the new \
|
return Err(Error::Message(format!("There are more than {} non-gateway nodes in the new \
|
||||||
layout. This is not allowed.", MAX_NODE_NUMBER).to_string());
|
layout. This is not allowed.", MAX_NODE_NUMBER).into() ));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut new_gateway_nodes: Vec<Uuid> = self.roles.items().iter()
|
let mut new_gateway_nodes: Vec<Uuid> = self.roles.items().iter()
|
||||||
@ -449,8 +474,8 @@ impl ClusterLayout {
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
if self.ring_assignation_data.len() != nb_partitions * self.replication_factor {
|
if self.ring_assignation_data.len() != nb_partitions * self.replication_factor {
|
||||||
return Err("The old assignation does not have a size corresponding to \
|
return Err(Error::Message("The old assignation does not have a size corresponding to \
|
||||||
the old replication factor or the number of partitions.".to_string());
|
the old replication factor or the number of partitions.".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
//We build a translation table between the uuid and new ids
|
//We build a translation table between the uuid and new ids
|
||||||
@ -482,14 +507,14 @@ impl ClusterLayout {
|
|||||||
|
|
||||||
///This function generates ids for the zone of the nodes appearing in
|
///This function generates ids for the zone of the nodes appearing in
|
||||||
///self.node_id_vec.
|
///self.node_id_vec.
|
||||||
fn generate_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>),String>{
|
fn generate_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>),Error>{
|
||||||
let mut id_to_zone = Vec::<String>::new();
|
let mut id_to_zone = Vec::<String>::new();
|
||||||
let mut zone_to_id = HashMap::<String,usize>::new();
|
let mut zone_to_id = HashMap::<String,usize>::new();
|
||||||
|
|
||||||
for uuid in self.node_id_vec.iter() {
|
for uuid in self.node_id_vec.iter() {
|
||||||
if self.roles.get(uuid) == None {
|
if self.roles.get(uuid) == None {
|
||||||
return Err("The uuid was not found in the node roles (this should \
|
return Err(Error::Message("The uuid was not found in the node roles (this should \
|
||||||
not happen, it might be a critical error).".to_string());
|
not happen, it might be a critical error).".into()));
|
||||||
}
|
}
|
||||||
match self.node_role(&uuid) {
|
match self.node_role(&uuid) {
|
||||||
Some(r) => if !zone_to_id.contains_key(&r.zone) && r.capacity != None {
|
Some(r) => if !zone_to_id.contains_key(&r.zone) && r.capacity != None {
|
||||||
@ -504,14 +529,14 @@ impl ClusterLayout {
|
|||||||
|
|
||||||
///This function computes by dichotomy the largest realizable partition size, given
|
///This function computes by dichotomy the largest realizable partition size, given
|
||||||
///the layout.
|
///the layout.
|
||||||
fn compute_optimal_partition_size(&self, zone_to_id: &HashMap<String, usize>) -> Result<u32,String>{
|
fn compute_optimal_partition_size(&self, zone_to_id: &HashMap<String, usize>) -> Result<u32,Error>{
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
let nb_partitions = 1usize << PARTITION_BITS;
|
||||||
let empty_set = HashSet::<(usize,usize)>::new();
|
let empty_set = HashSet::<(usize,usize)>::new();
|
||||||
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
|
let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
|
||||||
g.compute_maximal_flow()?;
|
g.compute_maximal_flow()?;
|
||||||
if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() {
|
if g.get_flow_value()? < (nb_partitions*self.replication_factor).try_into().unwrap() {
|
||||||
return Err("The storage capacity of he cluster is to small. It is \
|
return Err(Error::Message("The storage capacity of he cluster is to small. It is \
|
||||||
impossible to store partitions of size 1.".to_string());
|
impossible to store partitions of size 1.".into()));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut s_down = 1;
|
let mut s_down = 1;
|
||||||
@ -545,14 +570,15 @@ impl ClusterLayout {
|
|||||||
return vertices;
|
return vertices;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc : &HashSet<(usize,usize)>) -> Result<Graph<FlowEdge>, String> {
|
fn generate_flow_graph(&self, size: u32, zone_to_id: &HashMap<String, usize>, exclude_assoc : &HashSet<(usize,usize)>) -> Result<Graph<FlowEdge>, Error> {
|
||||||
let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(),
|
let vertices = ClusterLayout::generate_graph_vertices(zone_to_id.len(),
|
||||||
self.useful_nodes().len());
|
self.useful_nodes().len());
|
||||||
let mut g= Graph::<FlowEdge>::new(&vertices);
|
let mut g= Graph::<FlowEdge>::new(&vertices);
|
||||||
let nb_zones = zone_to_id.len();
|
let nb_zones = zone_to_id.len();
|
||||||
|
let redundancy = self.parameters.get().zone_redundancy;
|
||||||
for p in 0..NB_PARTITIONS {
|
for p in 0..NB_PARTITIONS {
|
||||||
g.add_edge(Vertex::Source, Vertex::Pup(p), self.zone_redundancy as u32)?;
|
g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u32)?;
|
||||||
g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - self.zone_redundancy) as u32)?;
|
g.add_edge(Vertex::Source, Vertex::Pdown(p), (self.replication_factor - redundancy) as u32)?;
|
||||||
for z in 0..nb_zones {
|
for z in 0..nb_zones {
|
||||||
g.add_edge(Vertex::Pup(p) , Vertex::PZ(p,z) , 1)?;
|
g.add_edge(Vertex::Pup(p) , Vertex::PZ(p,z) , 1)?;
|
||||||
g.add_edge(Vertex::Pdown(p) , Vertex::PZ(p,z) ,
|
g.add_edge(Vertex::Pdown(p) , Vertex::PZ(p,z) ,
|
||||||
@ -574,7 +600,7 @@ impl ClusterLayout {
|
|||||||
|
|
||||||
|
|
||||||
fn compute_candidate_assignment(&self, zone_to_id: &HashMap<String, usize>,
|
fn compute_candidate_assignment(&self, zone_to_id: &HashMap<String, usize>,
|
||||||
old_assoc_opt : &Option<Vec< Vec<usize> >>) -> Result<Graph<FlowEdge>, String > {
|
old_assoc_opt : &Option<Vec< Vec<usize> >>) -> Result<Graph<FlowEdge>, Error > {
|
||||||
|
|
||||||
//We list the edges that are not used in the old association
|
//We list the edges that are not used in the old association
|
||||||
let mut exclude_edge = HashSet::<(usize,usize)>::new();
|
let mut exclude_edge = HashSet::<(usize,usize)>::new();
|
||||||
@ -601,7 +627,7 @@ impl ClusterLayout {
|
|||||||
return Ok(g);
|
return Ok(g);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn minimize_rebalance_load(&self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, old_assoc : &Vec< Vec<usize> >) -> Result<(), String > {
|
fn minimize_rebalance_load(&self, gflow: &mut Graph<FlowEdge>, zone_to_id: &HashMap<String, usize>, old_assoc : &Vec< Vec<usize> >) -> Result<(), Error > {
|
||||||
let mut cost = CostFunction::new();
|
let mut cost = CostFunction::new();
|
||||||
for p in 0..NB_PARTITIONS {
|
for p in 0..NB_PARTITIONS {
|
||||||
for n in old_assoc[p].iter() {
|
for n in old_assoc[p].iter() {
|
||||||
@ -616,7 +642,7 @@ impl ClusterLayout {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph<FlowEdge> ) -> Result<(), String>{
|
fn update_ring_from_flow(&mut self, nb_zones : usize, gflow: &Graph<FlowEdge> ) -> Result<(), Error>{
|
||||||
self.ring_assignation_data = Vec::<CompactNodeType>::new();
|
self.ring_assignation_data = Vec::<CompactNodeType>::new();
|
||||||
for p in 0..NB_PARTITIONS {
|
for p in 0..NB_PARTITIONS {
|
||||||
for z in 0..nb_zones {
|
for z in 0..nb_zones {
|
||||||
@ -631,8 +657,8 @@ impl ClusterLayout {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if self.ring_assignation_data.len() != NB_PARTITIONS*self.replication_factor {
|
if self.ring_assignation_data.len() != NB_PARTITIONS*self.replication_factor {
|
||||||
return Err("Critical Error : the association ring we produced does not \
|
return Err(Error::Message("Critical Error : the association ring we produced does not \
|
||||||
have the right size.".to_string());
|
have the right size.".into()));
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
@ -643,7 +669,7 @@ impl ClusterLayout {
|
|||||||
fn output_stat(&self , gflow : &Graph<FlowEdge>,
|
fn output_stat(&self , gflow : &Graph<FlowEdge>,
|
||||||
old_assoc_opt : &Option< Vec<Vec<usize>> >,
|
old_assoc_opt : &Option< Vec<Vec<usize>> >,
|
||||||
zone_to_id: &HashMap<String, usize>,
|
zone_to_id: &HashMap<String, usize>,
|
||||||
id_to_zone : &Vec<String>) -> Result<Message, String>{
|
id_to_zone : &Vec<String>) -> Result<Message, Error>{
|
||||||
let mut msg = Message::new();
|
let mut msg = Message::new();
|
||||||
|
|
||||||
let nb_partitions = 1usize << PARTITION_BITS;
|
let nb_partitions = 1usize << PARTITION_BITS;
|
||||||
|
@ -196,7 +196,6 @@ impl System {
|
|||||||
network_key: NetworkKey,
|
network_key: NetworkKey,
|
||||||
background: Arc<BackgroundRunner>,
|
background: Arc<BackgroundRunner>,
|
||||||
replication_factor: usize,
|
replication_factor: usize,
|
||||||
zone_redundancy: usize,
|
|
||||||
config: &Config,
|
config: &Config,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
let node_key =
|
let node_key =
|
||||||
@ -226,7 +225,7 @@ impl System {
|
|||||||
"No valid previous cluster layout stored ({}), starting fresh.",
|
"No valid previous cluster layout stored ({}), starting fresh.",
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
ClusterLayout::new(replication_factor, zone_redundancy)
|
ClusterLayout::new(replication_factor)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user