avoid using layout_watch in System directly
This commit is contained in:
parent
12d1dbfc6b
commit
4a9c94514f
@ -122,7 +122,7 @@ async fn bucket_info_results(
|
|||||||
.table
|
.table
|
||||||
.get(&bucket_id, &EmptyKey)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&garage.system.layout_watch.borrow()))
|
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mpu_counters = garage
|
let mpu_counters = garage
|
||||||
@ -130,7 +130,7 @@ async fn bucket_info_results(
|
|||||||
.table
|
.table
|
||||||
.get(&bucket_id, &EmptyKey)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&garage.system.layout_watch.borrow()))
|
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut relevant_keys = HashMap::new();
|
let mut relevant_keys = HashMap::new();
|
||||||
|
@ -33,7 +33,7 @@ pub async fn handle_get_cluster_status(garage: &Arc<Garage>) -> Result<Response<
|
|||||||
hostname: i.status.hostname,
|
hostname: i.status.hostname,
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
layout: format_cluster_layout(&garage.system.get_cluster_layout()),
|
layout: format_cluster_layout(&garage.system.cluster_layout()),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(json_ok_response(&res)?)
|
Ok(json_ok_response(&res)?)
|
||||||
@ -84,7 +84,7 @@ pub async fn handle_connect_cluster_nodes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
|
pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
|
||||||
let res = format_cluster_layout(&garage.system.get_cluster_layout());
|
let res = format_cluster_layout(&garage.system.cluster_layout());
|
||||||
|
|
||||||
Ok(json_ok_response(&res)?)
|
Ok(json_ok_response(&res)?)
|
||||||
}
|
}
|
||||||
@ -207,7 +207,7 @@ pub async fn handle_update_cluster_layout(
|
|||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
|
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let mut layout = garage.system.get_cluster_layout();
|
let mut layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
|
|
||||||
let mut roles = layout.roles.clone();
|
let mut roles = layout.roles.clone();
|
||||||
roles.merge(&layout.staging_roles);
|
roles.merge(&layout.staging_roles);
|
||||||
@ -247,7 +247,7 @@ pub async fn handle_apply_cluster_layout(
|
|||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let layout = garage.system.get_cluster_layout();
|
let layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
||||||
|
|
||||||
garage.system.update_cluster_layout(&layout).await?;
|
garage.system.update_cluster_layout(&layout).await?;
|
||||||
@ -265,7 +265,7 @@ pub async fn handle_revert_cluster_layout(
|
|||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let layout = garage.system.get_cluster_layout();
|
let layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
let layout = layout.revert_staged_changes(Some(param.version))?;
|
let layout = layout.revert_staged_changes(Some(param.version))?;
|
||||||
garage.system.update_cluster_layout(&layout).await?;
|
garage.system.update_cluster_layout(&layout).await?;
|
||||||
|
|
||||||
|
@ -26,7 +26,7 @@ pub async fn handle_read_index(
|
|||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let reverse = reverse.unwrap_or(false);
|
let reverse = reverse.unwrap_or(false);
|
||||||
|
|
||||||
let layout: Arc<ClusterLayout> = garage.system.layout_watch.borrow().clone();
|
let layout: Arc<ClusterLayout> = garage.system.cluster_layout().clone();
|
||||||
|
|
||||||
let (partition_keys, more, next_start) = read_range(
|
let (partition_keys, more, next_start) = read_range(
|
||||||
&garage.k2v.counter_table.table,
|
&garage.k2v.counter_table.table,
|
||||||
|
@ -253,7 +253,7 @@ pub(crate) async fn check_quotas(
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let counters = counters
|
let counters = counters
|
||||||
.map(|x| x.filtered_values(&garage.system.layout_watch.borrow()))
|
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let (prev_cnt_obj, prev_cnt_size) = match prev_object {
|
let (prev_cnt_obj, prev_cnt_size) = match prev_object {
|
||||||
|
@ -70,7 +70,7 @@ impl AdminRpcHandler {
|
|||||||
.table
|
.table
|
||||||
.get(&bucket_id, &EmptyKey)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow()))
|
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mpu_counters = self
|
let mpu_counters = self
|
||||||
@ -79,7 +79,7 @@ impl AdminRpcHandler {
|
|||||||
.table
|
.table
|
||||||
.get(&bucket_id, &EmptyKey)
|
.get(&bucket_id, &EmptyKey)
|
||||||
.await?
|
.await?
|
||||||
.map(|x| x.filtered_values(&self.garage.system.layout_watch.borrow()))
|
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mut relevant_keys = HashMap::new();
|
let mut relevant_keys = HashMap::new();
|
||||||
|
@ -126,7 +126,7 @@ impl AdminRpcHandler {
|
|||||||
opt_to_send.all_nodes = false;
|
opt_to_send.all_nodes = false;
|
||||||
|
|
||||||
let mut failures = vec![];
|
let mut failures = vec![];
|
||||||
let layout = self.garage.system.layout_watch.borrow().clone();
|
let layout = self.garage.system.cluster_layout().clone();
|
||||||
for node in layout.node_ids().iter() {
|
for node in layout.node_ids().iter() {
|
||||||
let node = (*node).into();
|
let node = (*node).into();
|
||||||
let resp = self
|
let resp = self
|
||||||
@ -163,7 +163,7 @@ impl AdminRpcHandler {
|
|||||||
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
|
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
|
||||||
if opt.all_nodes {
|
if opt.all_nodes {
|
||||||
let mut ret = String::new();
|
let mut ret = String::new();
|
||||||
let layout = self.garage.system.layout_watch.borrow().clone();
|
let layout = self.garage.system.cluster_layout().clone();
|
||||||
|
|
||||||
for node in layout.node_ids().iter() {
|
for node in layout.node_ids().iter() {
|
||||||
let mut opt = opt.clone();
|
let mut opt = opt.clone();
|
||||||
@ -275,7 +275,7 @@ impl AdminRpcHandler {
|
|||||||
let mut ret = String::new();
|
let mut ret = String::new();
|
||||||
|
|
||||||
// Gather storage node and free space statistics
|
// Gather storage node and free space statistics
|
||||||
let layout = &self.garage.system.layout_watch.borrow();
|
let layout = &self.garage.system.cluster_layout();
|
||||||
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
let mut node_partition_count = HashMap::<Uuid, u64>::new();
|
||||||
for short_id in layout.ring_assignment_data.iter() {
|
for short_id in layout.ring_assignment_data.iter() {
|
||||||
let id = layout.node_id_vec[*short_id as usize];
|
let id = layout.node_id_vec[*short_id as usize];
|
||||||
@ -440,7 +440,7 @@ impl AdminRpcHandler {
|
|||||||
) -> Result<AdminRpc, Error> {
|
) -> Result<AdminRpc, Error> {
|
||||||
if all_nodes {
|
if all_nodes {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
let layout = self.garage.system.layout_watch.borrow().clone();
|
let layout = self.garage.system.cluster_layout().clone();
|
||||||
for node in layout.node_ids().iter() {
|
for node in layout.node_ids().iter() {
|
||||||
let node = (*node).into();
|
let node = (*node).into();
|
||||||
match self
|
match self
|
||||||
@ -488,7 +488,7 @@ impl AdminRpcHandler {
|
|||||||
) -> Result<AdminRpc, Error> {
|
) -> Result<AdminRpc, Error> {
|
||||||
if all_nodes {
|
if all_nodes {
|
||||||
let mut ret = vec![];
|
let mut ret = vec![];
|
||||||
let layout = self.garage.system.layout_watch.borrow().clone();
|
let layout = self.garage.system.cluster_layout().clone();
|
||||||
for node in layout.node_ids().iter() {
|
for node in layout.node_ids().iter() {
|
||||||
let node = (*node).into();
|
let node = (*node).into();
|
||||||
match self
|
match self
|
||||||
|
@ -453,7 +453,7 @@ impl<'a> BucketHelper<'a> {
|
|||||||
use garage_rpc::layout::ClusterLayout;
|
use garage_rpc::layout::ClusterLayout;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
let layout: Arc<ClusterLayout> = self.0.system.layout_watch.borrow().clone();
|
let layout: Arc<ClusterLayout> = self.0.system.cluster_layout().clone();
|
||||||
let k2vindexes = self
|
let k2vindexes = self
|
||||||
.0
|
.0
|
||||||
.k2v
|
.k2v
|
||||||
|
@ -423,8 +423,8 @@ impl System {
|
|||||||
known_nodes
|
known_nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_cluster_layout(&self) -> ClusterLayout {
|
pub fn cluster_layout(&self) -> watch::Ref<Arc<ClusterLayout>> {
|
||||||
self.layout_watch.borrow().as_ref().clone()
|
self.layout_watch.borrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_cluster_layout(
|
pub async fn update_cluster_layout(
|
||||||
|
@ -27,11 +27,11 @@ impl TableReplication for TableFullReplication {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
||||||
let layout = self.system.layout_watch.borrow();
|
let layout = self.system.cluster_layout();
|
||||||
layout.node_ids().to_vec()
|
layout.node_ids().to_vec()
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
let nmembers = self.system.layout_watch.borrow().node_ids().len();
|
let nmembers = self.system.cluster_layout().node_ids().len();
|
||||||
if nmembers > self.max_faults {
|
if nmembers > self.max_faults {
|
||||||
nmembers - self.max_faults
|
nmembers - self.max_faults
|
||||||
} else {
|
} else {
|
||||||
|
@ -26,7 +26,7 @@ pub struct TableShardedReplication {
|
|||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let layout = self.system.layout_watch.borrow();
|
let layout = self.system.cluster_layout();
|
||||||
layout.nodes_of(hash, self.replication_factor)
|
layout.nodes_of(hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
@ -34,7 +34,7 @@ impl TableReplication for TableShardedReplication {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
let layout = self.system.layout_watch.borrow();
|
let layout = self.system.cluster_layout();
|
||||||
layout.nodes_of(hash, self.replication_factor)
|
layout.nodes_of(hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
@ -45,9 +45,9 @@ impl TableReplication for TableShardedReplication {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn partition_of(&self, hash: &Hash) -> Partition {
|
fn partition_of(&self, hash: &Hash) -> Partition {
|
||||||
self.system.layout_watch.borrow().partition_of(hash)
|
self.system.cluster_layout().partition_of(hash)
|
||||||
}
|
}
|
||||||
fn partitions(&self) -> Vec<(Partition, Hash)> {
|
fn partitions(&self) -> Vec<(Partition, Hash)> {
|
||||||
self.system.layout_watch.borrow().partitions()
|
self.system.cluster_layout().partitions()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,7 +92,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
|||||||
bg.spawn_worker(SyncWorker {
|
bg.spawn_worker(SyncWorker {
|
||||||
syncer: self.clone(),
|
syncer: self.clone(),
|
||||||
layout_watch: self.system.layout_watch.clone(),
|
layout_watch: self.system.layout_watch.clone(),
|
||||||
layout: self.system.layout_watch.borrow().clone(),
|
layout: self.system.cluster_layout().clone(),
|
||||||
add_full_sync_rx,
|
add_full_sync_rx,
|
||||||
todo: vec![],
|
todo: vec![],
|
||||||
next_full_sync: Instant::now() + Duration::from_secs(20),
|
next_full_sync: Instant::now() + Duration::from_secs(20),
|
||||||
|
Loading…
Reference in New Issue
Block a user