table sync: adapt to new layout history

This commit is contained in:
Alex Auvolat 2023-11-11 12:08:32 +01:00
parent df36cf3099
commit ce89d1ddab
No known key found for this signature in database
GPG Key ID: 0E496D15096376BE
8 changed files with 172 additions and 129 deletions

View File

@ -47,11 +47,19 @@ impl LayoutHistory {
// ------------------ who stores what now? ---------------
pub fn max_ack(&self) -> u64 {
pub fn all_ack(&self) -> u64 {
self.calculate_global_min(&self.update_trackers.ack_map)
}
pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
pub fn min_stored(&self) -> u64 {
self.versions.first().as_ref().unwrap().version
}
pub fn sync_versions(&self) -> (u64, u64, u64) {
(self.current().version, self.all_ack(), self.min_stored())
}
pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> {
// TODO: cache this
self.versions
.iter()
@ -71,11 +79,10 @@ impl LayoutHistory {
version.nodes_of(position, version.replication_factor)
}
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
self.versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor))
.collect::<Vec<_>>()
.map(move |x| x.nodes_of(position, x.replication_factor))
}
// ------------------ update tracking ---------------
@ -129,7 +136,9 @@ impl LayoutHistory {
}
pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
let storage_nodes = self.all_storage_nodes();
// TODO: for TableFullReplication, counting gateway nodes might be
// necessary? Think about this more.
let storage_nodes = self.all_nongateway_nodes();
storage_nodes
.iter()
.map(|x| tracker.0.get(x).copied().unwrap_or(0))

View File

@ -92,6 +92,7 @@ impl LayoutManager {
persist_cluster_layout,
layout,
change_notify,
table_sync_version: Mutex::new(HashMap::new()),
system_endpoint,
rpc_helper,
}))

View File

@ -98,15 +98,13 @@ impl LayoutVersion {
}
/// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
(0..(1 << PARTITION_BITS))
.map(|i| {
let top = (i as u16) << (16 - PARTITION_BITS);
let mut location = [0u8; 32];
location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
(i as u16, Hash::from(location))
})
.collect::<Vec<_>>()
pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ {
(0..(1 << PARTITION_BITS)).map(|i| {
let top = (i as u16) << (16 - PARTITION_BITS);
let mut location = [0u8; 32];
location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
(i as u16, Hash::from(location))
})
}
/// Return the n servers in which data for this hash should be replicated

View File

@ -442,7 +442,7 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
let partitions = layout.current().partitions();
let partitions = layout.current().partitions().collect::<Vec<_>>();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {

View File

@ -1,3 +1,4 @@
use std::iter::FromIterator;
use std::sync::Arc;
use garage_rpc::layout::*;
@ -6,10 +7,17 @@ use garage_util::data::*;
use crate::replication::*;
// TODO: find a way to track layout changes for this as well
// The hard thing is that this data is stored also on gateway nodes,
// whereas sharded data is stored only on non-Gateway nodes (storage nodes)
// Also we want to be more tolerant to failures of gateways so we don't
// want to do too much holding back of data when progress of gateway
// nodes is not reported in the layout history's ack/sync/sync_ack maps.
/// Full replication schema: all nodes store everything
/// Writes are disseminated in an epidemic manner in the network
/// Advantage: do all reads locally, extremely fast
/// Inconvenient: only suitable to reasonably small tables
/// Inconvenient: if some writes fail, nodes will read outdated data
#[derive(Clone)]
pub struct TableFullReplication {
/// The membership manager of this node
@ -44,7 +52,18 @@ impl TableReplication for TableFullReplication {
fn partition_of(&self, _hash: &Hash) -> Partition {
0u16
}
fn partitions(&self) -> Vec<(Partition, Hash)> {
vec![(0u16, [0u8; 32].into())]
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout_version = layout.current().version;
SyncPartitions {
layout_version,
partitions: vec![SyncPartition {
partition: 0u16,
first_hash: [0u8; 32].into(),
last_hash: [0xff; 32].into(),
storage_nodes: Vec::from_iter(layout.current().node_ids().to_vec()),
}],
}
}
}

View File

@ -20,6 +20,21 @@ pub trait TableReplication: Send + Sync + 'static {
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
/// List of existing partitions
fn partitions(&self) -> Vec<(Partition, Hash)>;
/// List of partitions and nodes to sync with in current layout
fn sync_partitions(&self) -> SyncPartitions;
}
#[derive(Debug)]
pub struct SyncPartitions {
pub layout_version: u64,
pub partitions: Vec<SyncPartition>,
}
#[derive(Debug)]
pub struct SyncPartition {
pub partition: Partition,
pub first_hash: Hash,
pub last_hash: Hash,
pub storage_nodes: Vec<Uuid>,
}

View File

@ -51,7 +51,42 @@ impl TableReplication for TableShardedReplication {
fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash)
}
fn partitions(&self) -> Vec<(Partition, Hash)> {
self.system.cluster_layout().current().partitions()
fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout();
let layout_version = layout.all_ack();
let mut partitions = layout
.current()
.partitions()
.map(|(partition, first_hash)| {
let mut storage_nodes = layout
.write_sets_of(&first_hash)
.map(|x| x.into_iter())
.flatten()
.collect::<Vec<_>>();
storage_nodes.sort();
storage_nodes.dedup();
SyncPartition {
partition,
first_hash,
last_hash: [0u8; 32].into(), // filled in just after
storage_nodes,
}
})
.collect::<Vec<_>>();
for i in 0..partitions.len() {
partitions[i].last_hash = if i + 1 < partitions.len() {
partitions[i + 1].first_hash
} else {
[0xFFu8; 32].into()
};
}
SyncPartitions {
layout_version,
partitions,
}
}
}

View File

@ -6,7 +6,7 @@ use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
@ -52,16 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
begin: Hash,
end: Hash,
// Are we a node that stores this partition or not?
retain: bool,
}
impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
@ -92,9 +82,9 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
layout_notify: self.system.layout_notify(),
layout_version: self.system.cluster_layout().current().version,
layout_versions: self.system.cluster_layout().sync_versions(),
add_full_sync_rx,
todo: vec![],
todo: None,
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
@ -112,31 +102,26 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn sync_partition(
self: &Arc<Self>,
partition: &TodoPartition,
partition: &SyncPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
if partition.retain {
let my_id = self.system.id;
let nodes = self
.data
.replication
.write_nodes(&partition.begin)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
let my_id = self.system.id;
let retain = partition.storage_nodes.contains(&my_id);
if retain {
debug!(
"({}) Syncing {:?} with {:?}...",
F::TABLE_NAME,
partition,
nodes
partition.storage_nodes
);
let mut sync_futures = nodes
let mut sync_futures = partition
.storage_nodes
.iter()
.filter(|node| **node != my_id)
.map(|node| {
self.clone()
.do_sync_with(partition.clone(), *node, must_exit.clone())
.do_sync_with(&partition, *node, must_exit.clone())
})
.collect::<FuturesUnordered<_>>();
@ -147,14 +132,14 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
warn!("({}) Sync error: {}", F::TABLE_NAME, e);
}
}
if n_errors > self.data.replication.max_write_errors() {
if n_errors > 0 {
return Err(Error::Message(format!(
"Sync failed with too many nodes (should have been: {:?}).",
nodes
"Sync failed with {} nodes.",
n_errors
)));
}
} else {
self.offload_partition(&partition.begin, &partition.end, must_exit)
self.offload_partition(&partition.first_hash, &partition.last_hash, must_exit)
.await?;
}
@ -285,7 +270,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
partition: &SyncPartition,
who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
@ -492,76 +477,23 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
layout_notify: Arc<Notify>,
layout_version: u64,
layout_versions: (u64, u64, u64),
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>,
next_full_sync: Instant,
todo: Option<SyncPartitions>,
}
impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
let my_id = system.id;
self.todo.clear();
let partitions = data.replication.partitions();
for i in 0..partitions.len() {
let begin = partitions[i].1;
let end = if i + 1 < partitions.len() {
partitions[i + 1].1
} else {
[0xFFu8; 32].into()
};
let nodes = data.replication.write_nodes(&begin);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
match data.store.range(begin..end) {
Ok(mut iter) => {
if iter.next().is_none() {
continue;
}
}
Err(e) => {
warn!("DB error in add_full_sync: {}", e);
continue;
}
}
}
self.todo.push(TodoPartition {
partition: partitions[i].0,
begin,
end,
retain,
});
}
let mut partitions = self.syncer.data.replication.sync_partitions();
partitions.partitions.shuffle(&mut thread_rng());
self.todo = Some(partitions);
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
if self.todo.is_empty() {
return None;
}
let i = rand::thread_rng().gen_range(0..self.todo.len());
if i == self.todo.len() - 1 {
self.todo.pop()
} else {
let replacement = self.todo.pop().unwrap();
let ret = std::mem::replace(&mut self.todo[i], replacement);
Some(ret)
}
}
}
#[async_trait]
@ -572,18 +504,46 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.todo.len() as u64),
queue_length: Some(self.todo.as_ref().map(|x| x.partitions.len()).unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if let Some(partition) = self.pop_task() {
self.syncer.sync_partition(&partition, must_exit).await?;
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Idle)
if let Some(todo) = &mut self.todo {
let partition = todo.partitions.pop().unwrap();
// process partition
if let Err(e) = self.syncer.sync_partition(&partition, must_exit).await {
error!(
"{}: Failed to sync partition {:?}: {}",
F::TABLE_NAME,
partition,
e
);
// if error, put partition back at the other side of the queue,
// so that other partitions will be tried in the meantime
todo.partitions.insert(0, partition);
// TODO: returning an error here will cause the background job worker
// to delay this task for some time, but maybe we don't want to
// delay it if there are lots of failures from nodes that are gone
// (we also don't want zero delays as that will cause lots of useless retries)
return Err(e);
}
// done
if !todo.partitions.is_empty() {
return Ok(WorkerState::Busy);
}
self.syncer
.system
.layout_manager
.sync_table_until(F::TABLE_NAME, todo.layout_version);
}
self.todo = None;
Ok(WorkerState::Idle)
}
async fn wait_for_work(&mut self) -> WorkerState {
@ -594,10 +554,16 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
}
},
_ = self.layout_notify.notified() => {
let new_version = self.syncer.system.cluster_layout().current().version;
if new_version > self.layout_version {
self.layout_version = new_version;
debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME);
let layout_versions = self.syncer.system.cluster_layout().sync_versions();
if layout_versions != self.layout_versions {
self.layout_versions = layout_versions;
debug!(
"({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list",
F::TABLE_NAME,
layout_versions.0,
layout_versions.1,
layout_versions.2
);
self.add_full_sync();
}
},
@ -605,9 +571,9 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync();
}
}
match self.todo.is_empty() {
false => WorkerState::Busy,
true => WorkerState::Idle,
match self.todo.is_some() {
true => WorkerState::Busy,
false => WorkerState::Idle,
}
}
}