Implement GC delay for table data
This commit is contained in:
parent
74a7a550eb
commit
ad7ab31411
@ -1,3 +1,4 @@
|
|||||||
|
use std::convert::TryInto;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -422,7 +423,7 @@ impl BlockManager {
|
|||||||
|
|
||||||
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
|
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
|
||||||
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
|
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
|
||||||
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
|
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
if now >= time_msec {
|
if now >= time_msec {
|
||||||
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
||||||
@ -705,13 +706,6 @@ impl BlockManagerLocked {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
|
|
||||||
assert!(bytes.as_ref().len() == 8);
|
|
||||||
let mut x8 = [0u8; 8];
|
|
||||||
x8.copy_from_slice(bytes.as_ref());
|
|
||||||
u64::from_be_bytes(x8)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Describes the state of the reference counter for a block
|
/// Describes the state of the reference counter for a block
|
||||||
#[derive(Clone, Copy, Debug)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
enum RcEntry {
|
enum RcEntry {
|
||||||
|
@ -55,7 +55,7 @@ where
|
|||||||
.expect("Unable to open DB Merkle TODO tree");
|
.expect("Unable to open DB Merkle TODO tree");
|
||||||
|
|
||||||
let gc_todo = db
|
let gc_todo = db
|
||||||
.open_tree(&format!("{}:gc_todo", name))
|
.open_tree(&format!("{}:gc_todo_v2", name))
|
||||||
.expect("Unable to open DB tree");
|
.expect("Unable to open DB tree");
|
||||||
|
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
|
108
src/table/gc.rs
108
src/table/gc.rs
@ -1,4 +1,5 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -13,6 +14,7 @@ use tokio::sync::watch;
|
|||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
use garage_util::time::*;
|
||||||
|
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
@ -24,6 +26,11 @@ use crate::schema::*;
|
|||||||
const TABLE_GC_BATCH_SIZE: usize = 1024;
|
const TABLE_GC_BATCH_SIZE: usize = 1024;
|
||||||
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
|
||||||
|
// GC delay for table entries: 1 day (24 hours)
|
||||||
|
// (the delay before the entry is added in the GC todo list
|
||||||
|
// and the moment the garbage collection actually happens)
|
||||||
|
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
|
||||||
|
|
||||||
pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
|
pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
data: Arc<TableData<F, R>>,
|
data: Arc<TableData<F, R>>,
|
||||||
@ -72,35 +79,49 @@ where
|
|||||||
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
|
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
match self.gc_loop_iter().await {
|
match self.gc_loop_iter().await {
|
||||||
Ok(true) => {
|
Ok(None) => {
|
||||||
// Stuff was done, loop immediately
|
// Stuff was done, loop immediately
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
Ok(false) => {
|
Ok(Some(wait_delay)) => {
|
||||||
// Nothing was done, sleep for some time (below)
|
// Nothing was done, wait specified delay.
|
||||||
|
select! {
|
||||||
|
_ = tokio::time::sleep(wait_delay).fuse() => {},
|
||||||
|
_ = must_exit.changed().fuse() => {},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("({}) Error doing GC: {}", self.data.name, e);
|
warn!("({}) Error doing GC: {}", self.data.name, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
select! {
|
|
||||||
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {},
|
|
||||||
_ = must_exit.changed().fuse() => {},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn gc_loop_iter(&self) -> Result<bool, Error> {
|
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
|
||||||
|
let now = now_msec();
|
||||||
|
|
||||||
let mut entries = vec![];
|
let mut entries = vec![];
|
||||||
let mut excluded = vec![];
|
let mut excluded = vec![];
|
||||||
|
|
||||||
// List entries in the GC todo list
|
// List entries in the GC todo list
|
||||||
// These entries are put there when a tombstone is inserted in the table
|
// These entries are put there when a tombstone is inserted in the table
|
||||||
// This is detected and done in data.rs in update_entry
|
// (see update_entry in data.rs)
|
||||||
for entry_kv in self.data.gc_todo.iter() {
|
for entry_kv in self.data.gc_todo.iter() {
|
||||||
let (k, vhash) = entry_kv?;
|
let (k, vhash) = entry_kv?;
|
||||||
let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
|
let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
|
||||||
|
|
||||||
|
if todo_entry.deletion_time() > now {
|
||||||
|
if entries.is_empty() && excluded.is_empty() {
|
||||||
|
// If the earliest entry in the todo list shouldn't yet be processed,
|
||||||
|
// return a duration to wait in the loop
|
||||||
|
return Ok(Some(Duration::from_millis(
|
||||||
|
todo_entry.deletion_time() - now,
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
// Otherwise we have some entries to process, do a normal iteration.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let vhash = Hash::try_from(&vhash[..]).unwrap();
|
let vhash = Hash::try_from(&vhash[..]).unwrap();
|
||||||
|
|
||||||
// Check if the tombstone is still the current value of the entry.
|
// Check if the tombstone is still the current value of the entry.
|
||||||
@ -134,8 +155,9 @@ where
|
|||||||
// and for which they are still currently tombstones in the table.
|
// and for which they are still currently tombstones in the table.
|
||||||
|
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
// Nothing to do in this iteration
|
// Nothing to do in this iteration (no entries present)
|
||||||
return Ok(false);
|
// Wait for a default delay of 60 seconds
|
||||||
|
return Ok(Some(Duration::from_secs(60)));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("({}) GC: doing {} items", self.data.name, entries.len());
|
debug!("({}) GC: doing {} items", self.data.name, entries.len());
|
||||||
@ -181,7 +203,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if errs.is_empty() {
|
if errs.is_empty() {
|
||||||
Ok(true)
|
Ok(None)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::Message(
|
Err(Error::Message(
|
||||||
errs.into_iter()
|
errs.into_iter()
|
||||||
@ -189,19 +211,20 @@ where
|
|||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.join(", "),
|
.join(", "),
|
||||||
))
|
))
|
||||||
.err_context("in try_send_and_delete:")
|
.err_context("in try_send_and_delete in table GC:")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_send_and_delete(
|
async fn try_send_and_delete(
|
||||||
&self,
|
&self,
|
||||||
nodes: Vec<Uuid>,
|
nodes: Vec<Uuid>,
|
||||||
items: Vec<GcTodoEntry>,
|
mut items: Vec<GcTodoEntry>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let n_items = items.len();
|
let n_items = items.len();
|
||||||
|
|
||||||
// Strategy: we first send all of the values to the remote nodes,
|
// Strategy: we first send all of the values to the remote nodes,
|
||||||
// to ensure that they are aware of the tombstone state.
|
// to ensure that they are aware of the tombstone state,
|
||||||
|
// and that the previous state was correctly overwritten
|
||||||
// (if they have a newer state that overrides the tombstone, that's fine).
|
// (if they have a newer state that overrides the tombstone, that's fine).
|
||||||
// Second, once everyone is at least at the tombstone state,
|
// Second, once everyone is at least at the tombstone state,
|
||||||
// we instruct everyone to delete the tombstone IF that is still their current state.
|
// we instruct everyone to delete the tombstone IF that is still their current state.
|
||||||
@ -209,13 +232,14 @@ where
|
|||||||
// tombstone in the CRDT lattice, and it will be propagated back to us at some point
|
// tombstone in the CRDT lattice, and it will be propagated back to us at some point
|
||||||
// (either just a regular update that hasn't reached us yet, or later when the
|
// (either just a regular update that hasn't reached us yet, or later when the
|
||||||
// table is synced).
|
// table is synced).
|
||||||
|
|
||||||
// Here, we store in updates all of the tombstones to send for step 1,
|
// Here, we store in updates all of the tombstones to send for step 1,
|
||||||
// and in deletes the list of keys and hashes of value for step 2.
|
// and in deletes the list of keys and hashes of value for step 2.
|
||||||
let mut updates = vec![];
|
let mut updates = vec![];
|
||||||
let mut deletes = vec![];
|
let mut deletes = vec![];
|
||||||
for item in items {
|
for item in items.iter_mut() {
|
||||||
updates.push(ByteBuf::from(item.value.unwrap()));
|
updates.push(ByteBuf::from(item.value.take().unwrap()));
|
||||||
deletes.push((ByteBuf::from(item.key), item.value_hash));
|
deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 1: ensure everyone is at least at tombstone in CRDT lattice
|
// Step 1: ensure everyone is at least at tombstone in CRDT lattice
|
||||||
@ -250,7 +274,7 @@ where
|
|||||||
.try_call_many(
|
.try_call_many(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
&nodes[..],
|
&nodes[..],
|
||||||
GcRpc::DeleteIfEqualHash(deletes.clone()),
|
GcRpc::DeleteIfEqualHash(deletes),
|
||||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||||
.with_quorum(nodes.len())
|
.with_quorum(nodes.len())
|
||||||
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
||||||
@ -260,24 +284,16 @@ where
|
|||||||
|
|
||||||
// GC has been successfull for all of these entries.
|
// GC has been successfull for all of these entries.
|
||||||
// We now remove them all from our local table and from the GC todo list.
|
// We now remove them all from our local table and from the GC todo list.
|
||||||
for (k, vhash) in deletes {
|
for item in items {
|
||||||
self.data
|
self.data
|
||||||
.delete_if_equal_hash(&k[..], vhash)
|
.delete_if_equal_hash(&item.key[..], item.value_hash)
|
||||||
.err_context("GC: local delete tombstones")?;
|
.err_context("GC: local delete tombstones")?;
|
||||||
self.todo_remove_if_equal(&k[..], vhash)
|
item.remove_if_equal(&self.data.gc_todo)
|
||||||
.err_context("GC: remove from todo list after successfull GC")?;
|
.err_context("GC: remove from todo list after successfull GC")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> {
|
|
||||||
let _ = self
|
|
||||||
.data
|
|
||||||
.gc_todo
|
|
||||||
.compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@ -295,7 +311,6 @@ where
|
|||||||
GcRpc::DeleteIfEqualHash(items) => {
|
GcRpc::DeleteIfEqualHash(items) => {
|
||||||
for (key, vhash) in items.iter() {
|
for (key, vhash) in items.iter() {
|
||||||
self.data.delete_if_equal_hash(&key[..], *vhash)?;
|
self.data.delete_if_equal_hash(&key[..], *vhash)?;
|
||||||
self.todo_remove_if_equal(&key[..], *vhash)?;
|
|
||||||
}
|
}
|
||||||
Ok(GcRpc::Ok)
|
Ok(GcRpc::Ok)
|
||||||
}
|
}
|
||||||
@ -307,7 +322,16 @@ where
|
|||||||
/// An entry stored in the gc_todo Sled tree associated with the table
|
/// An entry stored in the gc_todo Sled tree associated with the table
|
||||||
/// Contains helper function for parsing, saving, and removing
|
/// Contains helper function for parsing, saving, and removing
|
||||||
/// such entry in Sled
|
/// such entry in Sled
|
||||||
|
///
|
||||||
|
/// Format of an entry:
|
||||||
|
/// - key = 8 bytes: timestamp of tombstone
|
||||||
|
/// (used to implement GC delay)
|
||||||
|
/// n bytes: key in the main data table
|
||||||
|
/// - value = hash of the table entry to delete (the tombstone)
|
||||||
|
/// for verification purpose, because we don't want to delete
|
||||||
|
/// things that aren't tombstones
|
||||||
pub(crate) struct GcTodoEntry {
|
pub(crate) struct GcTodoEntry {
|
||||||
|
tombstone_timestamp: u64,
|
||||||
key: Vec<u8>,
|
key: Vec<u8>,
|
||||||
value_hash: Hash,
|
value_hash: Hash,
|
||||||
value: Option<Vec<u8>>,
|
value: Option<Vec<u8>>,
|
||||||
@ -319,6 +343,7 @@ impl GcTodoEntry {
|
|||||||
/// serialized value
|
/// serialized value
|
||||||
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
|
pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
tombstone_timestamp: now_msec(),
|
||||||
key,
|
key,
|
||||||
value_hash,
|
value_hash,
|
||||||
value: None,
|
value: None,
|
||||||
@ -328,7 +353,8 @@ impl GcTodoEntry {
|
|||||||
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
|
/// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
|
||||||
pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
|
pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
|
||||||
Self {
|
Self {
|
||||||
key: sled_k.to_vec(),
|
tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()),
|
||||||
|
key: sled_k[8..].to_vec(),
|
||||||
value_hash: Hash::try_from(sled_v).unwrap(),
|
value_hash: Hash::try_from(sled_v).unwrap(),
|
||||||
value: None,
|
value: None,
|
||||||
}
|
}
|
||||||
@ -336,7 +362,7 @@ impl GcTodoEntry {
|
|||||||
|
|
||||||
/// Saves the GcTodoEntry in the gc_todo tree
|
/// Saves the GcTodoEntry in the gc_todo tree
|
||||||
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
||||||
gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?;
|
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -347,10 +373,22 @@ impl GcTodoEntry {
|
|||||||
/// what we have to do is still the same
|
/// what we have to do is still the same
|
||||||
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
|
||||||
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
|
let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
|
||||||
&self.key[..],
|
&self.todo_table_key()[..],
|
||||||
Some(self.value_hash),
|
Some(self.value_hash),
|
||||||
None,
|
None,
|
||||||
)?;
|
)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn todo_table_key(&self) -> Vec<u8> {
|
||||||
|
[
|
||||||
|
&u64::to_be_bytes(self.tombstone_timestamp)[..],
|
||||||
|
&self.key[..],
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deletion_time(&self) -> u64 {
|
||||||
|
self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user