Proper queueing for delayed inserts, now backed to disk
This commit is contained in:
parent
f8e528c15d
commit
83c8467e23
@ -1,17 +1,14 @@
|
|||||||
use core::ops::Bound;
|
use core::ops::Bound;
|
||||||
use std::collections::{hash_map, BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::{mpsc, watch};
|
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
|
||||||
use garage_rpc::ring::Ring;
|
use garage_rpc::ring::Ring;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::system::System;
|
||||||
use garage_util::background::*;
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
@ -142,7 +139,6 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
|
|||||||
pub struct IndexCounter<T: CountedItem> {
|
pub struct IndexCounter<T: CountedItem> {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
local_counter: db::Tree,
|
local_counter: db::Tree,
|
||||||
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
|
||||||
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,16 +148,11 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
replication: TableShardedReplication,
|
replication: TableShardedReplication,
|
||||||
db: &db::Db,
|
db: &db::Db,
|
||||||
) -> Arc<Self> {
|
) -> Arc<Self> {
|
||||||
let background = system.background.clone();
|
Arc::new(Self {
|
||||||
|
|
||||||
let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let this = Arc::new(Self {
|
|
||||||
this_node: system.id,
|
this_node: system.id,
|
||||||
local_counter: db
|
local_counter: db
|
||||||
.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
|
.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
|
||||||
.expect("Unable to open local counter tree"),
|
.expect("Unable to open local counter tree"),
|
||||||
propagate_tx,
|
|
||||||
table: Table::new(
|
table: Table::new(
|
||||||
CounterTable {
|
CounterTable {
|
||||||
_phantom_t: Default::default(),
|
_phantom_t: Default::default(),
|
||||||
@ -170,16 +161,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
system,
|
system,
|
||||||
db,
|
db,
|
||||||
),
|
),
|
||||||
});
|
})
|
||||||
|
|
||||||
background.spawn_worker(IndexPropagatorWorker {
|
|
||||||
index_counter: this.clone(),
|
|
||||||
propagate_rx,
|
|
||||||
buf: HashMap::new(),
|
|
||||||
errors: 0,
|
|
||||||
});
|
|
||||||
|
|
||||||
this
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn count(
|
pub fn count(
|
||||||
@ -232,12 +214,8 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
.map_err(db::TxError::Abort)?;
|
.map_err(db::TxError::Abort)?;
|
||||||
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
|
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
|
||||||
|
|
||||||
if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
|
let dist_entry = entry.into_counter_entry(self.this_node);
|
||||||
error!(
|
self.table.queue_insert(tx, &dist_entry)?;
|
||||||
"Could not propagate updated counter values, failed to send to channel: {}",
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -250,19 +228,6 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
TS: TableSchema<E = T>,
|
TS: TableSchema<E = T>,
|
||||||
TR: TableReplication,
|
TR: TableReplication,
|
||||||
{
|
{
|
||||||
let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
|
|
||||||
self.table
|
|
||||||
.data
|
|
||||||
.update_entry_with(&entry.partition_key(), &entry.sort_key(), |ent| match ent {
|
|
||||||
Some(mut ent) => {
|
|
||||||
ent.merge(&entry);
|
|
||||||
ent
|
|
||||||
}
|
|
||||||
None => entry.clone(),
|
|
||||||
})?;
|
|
||||||
Ok(())
|
|
||||||
};
|
|
||||||
|
|
||||||
// 1. Set all old local counters to zero
|
// 1. Set all old local counters to zero
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
let mut next_start: Option<Vec<u8>> = None;
|
let mut next_start: Option<Vec<u8>> = None;
|
||||||
@ -298,7 +263,9 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
.insert(&local_counter_k, &local_counter_bytes)?;
|
.insert(&local_counter_k, &local_counter_bytes)?;
|
||||||
|
|
||||||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
save_counter_entry(counter_entry)?;
|
self.local_counter
|
||||||
|
.db()
|
||||||
|
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
||||||
|
|
||||||
next_start = Some(local_counter_k);
|
next_start = Some(local_counter_k);
|
||||||
}
|
}
|
||||||
@ -363,7 +330,9 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
.insert(&local_counter_key, local_counter_bytes)?;
|
.insert(&local_counter_key, local_counter_bytes)?;
|
||||||
|
|
||||||
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
let counter_entry = local_counter.into_counter_entry(self.this_node);
|
||||||
save_counter_entry(counter_entry)?;
|
self.local_counter
|
||||||
|
.db()
|
||||||
|
.transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
|
||||||
|
|
||||||
next_start = Some(counted_entry_k);
|
next_start = Some(counted_entry_k);
|
||||||
}
|
}
|
||||||
@ -374,96 +343,7 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct IndexPropagatorWorker<T: CountedItem> {
|
// ----
|
||||||
index_counter: Arc<IndexCounter<T>>,
|
|
||||||
propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
|
|
||||||
|
|
||||||
buf: HashMap<Vec<u8>, CounterEntry<T>>,
|
|
||||||
errors: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: CountedItem> IndexPropagatorWorker<T> {
|
|
||||||
fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
|
|
||||||
let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
|
|
||||||
let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
|
|
||||||
match self.buf.entry(tree_key) {
|
|
||||||
hash_map::Entry::Vacant(e) => {
|
|
||||||
e.insert(dist_entry);
|
|
||||||
}
|
|
||||||
hash_map::Entry::Occupied(mut e) => {
|
|
||||||
e.get_mut().merge(&dist_entry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
|
|
||||||
fn name(&self) -> String {
|
|
||||||
format!("{} counter", T::COUNTER_TABLE_NAME)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn status(&self) -> WorkerStatus {
|
|
||||||
WorkerStatus {
|
|
||||||
queue_length: Some(self.buf.len() as u64),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
|
||||||
// This loop batches updates to counters to be sent all at once.
|
|
||||||
// They are sent once the propagate_rx channel has been emptied (or is closed).
|
|
||||||
let closed = loop {
|
|
||||||
match self.propagate_rx.try_recv() {
|
|
||||||
Ok((pk, sk, counters)) => {
|
|
||||||
self.add_ent(pk, sk, counters);
|
|
||||||
}
|
|
||||||
Err(mpsc::error::TryRecvError::Empty) => break false,
|
|
||||||
Err(mpsc::error::TryRecvError::Disconnected) => break true,
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if !self.buf.is_empty() {
|
|
||||||
let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
|
|
||||||
let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
|
|
||||||
if let Err(e) = self.index_counter.table.insert_many(entries).await {
|
|
||||||
self.errors += 1;
|
|
||||||
if self.errors >= 2 && *must_exit.borrow() {
|
|
||||||
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
|
|
||||||
return Ok(WorkerState::Done);
|
|
||||||
}
|
|
||||||
// Propagate error up to worker manager, it will log it, increment a counter,
|
|
||||||
// and sleep for a certain delay (with exponential backoff), waiting for
|
|
||||||
// things to go back to normal
|
|
||||||
return Err(e);
|
|
||||||
} else {
|
|
||||||
for k in entries_k {
|
|
||||||
self.buf.remove(&k);
|
|
||||||
}
|
|
||||||
self.errors = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(WorkerState::Busy);
|
|
||||||
} else if closed {
|
|
||||||
return Ok(WorkerState::Done);
|
|
||||||
} else {
|
|
||||||
return Ok(WorkerState::Idle);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
|
|
||||||
match self.propagate_rx.recv().await {
|
|
||||||
Some((pk, sk, counters)) => {
|
|
||||||
self.add_ent(pk, sk, counters);
|
|
||||||
WorkerState::Busy
|
|
||||||
}
|
|
||||||
None => match self.buf.is_empty() {
|
|
||||||
false => WorkerState::Busy,
|
|
||||||
true => WorkerState::Done,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
struct LocalCounterEntry<T: CountedItem> {
|
struct LocalCounterEntry<T: CountedItem> {
|
||||||
|
@ -255,34 +255,34 @@ impl TableSchema for ObjectTable {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Spawn threads that propagates deletions to version table
|
// 2. Enqueue propagation deletions to version table
|
||||||
let version_table = self.version_table.clone();
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
let old = old.cloned();
|
// Propagate deletion of old versions
|
||||||
let new = new.cloned();
|
for v in old_v.versions.iter() {
|
||||||
|
let newly_deleted = match new_v
|
||||||
self.background.spawn(async move {
|
.versions
|
||||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
||||||
// Propagate deletion of old versions
|
{
|
||||||
for v in old_v.versions.iter() {
|
Err(_) => true,
|
||||||
let newly_deleted = match new_v
|
Ok(i) => {
|
||||||
.versions
|
new_v.versions[i].state == ObjectVersionState::Aborted
|
||||||
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
&& v.state != ObjectVersionState::Aborted
|
||||||
{
|
}
|
||||||
Err(_) => true,
|
};
|
||||||
Ok(i) => {
|
if newly_deleted {
|
||||||
new_v.versions[i].state == ObjectVersionState::Aborted
|
let deleted_version =
|
||||||
&& v.state != ObjectVersionState::Aborted
|
Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
|
||||||
}
|
let res = self.version_table.queue_insert(tx, &deleted_version);
|
||||||
};
|
if let Err(e) = db::unabort(res)? {
|
||||||
if newly_deleted {
|
error!(
|
||||||
let deleted_version =
|
"Unable to enqueue version deletion propagation: {}. A repair will be needed.",
|
||||||
Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
|
e
|
||||||
version_table.insert(&deleted_version).await?;
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
});
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,33 +141,26 @@ impl TableSchema for VersionTable {
|
|||||||
|
|
||||||
fn updated(
|
fn updated(
|
||||||
&self,
|
&self,
|
||||||
_tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
old: Option<&Self::E>,
|
old: Option<&Self::E>,
|
||||||
new: Option<&Self::E>,
|
new: Option<&Self::E>,
|
||||||
) -> db::TxOpResult<()> {
|
) -> db::TxOpResult<()> {
|
||||||
let block_ref_table = self.block_ref_table.clone();
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
let old = old.cloned();
|
// Propagate deletion of version blocks
|
||||||
let new = new.cloned();
|
if new_v.deleted.get() && !old_v.deleted.get() {
|
||||||
|
let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef {
|
||||||
self.background.spawn(async move {
|
block: vb.hash,
|
||||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
version: old_v.uuid,
|
||||||
// Propagate deletion of version blocks
|
deleted: true.into(),
|
||||||
if new_v.deleted.get() && !old_v.deleted.get() {
|
});
|
||||||
let deleted_block_refs = old_v
|
for block_ref in deleted_block_refs {
|
||||||
.blocks
|
let res = self.block_ref_table.queue_insert(tx, &block_ref);
|
||||||
.items()
|
if let Err(e) = db::unabort(res)? {
|
||||||
.iter()
|
error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e);
|
||||||
.map(|(_k, vb)| BlockRef {
|
}
|
||||||
block: vb.hash,
|
|
||||||
version: old_v.uuid,
|
|
||||||
deleted: true.into(),
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
}
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
|
|||||||
pub(crate) merkle_tree: db::Tree,
|
pub(crate) merkle_tree: db::Tree,
|
||||||
pub(crate) merkle_todo: db::Tree,
|
pub(crate) merkle_todo: db::Tree,
|
||||||
pub(crate) merkle_todo_notify: Notify,
|
pub(crate) merkle_todo_notify: Notify,
|
||||||
|
|
||||||
|
pub(crate) insert_queue: db::Tree,
|
||||||
|
pub(crate) insert_queue_notify: Notify,
|
||||||
|
|
||||||
pub(crate) gc_todo: CountedTree,
|
pub(crate) gc_todo: CountedTree,
|
||||||
|
|
||||||
pub(crate) metrics: TableMetrics,
|
pub(crate) metrics: TableMetrics,
|
||||||
@ -53,9 +57,13 @@ where
|
|||||||
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
|
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB Merkle TODO tree");
|
.expect("Unable to open DB Merkle TODO tree");
|
||||||
|
|
||||||
|
let insert_queue = db
|
||||||
|
.open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
|
||||||
|
.expect("Unable to open insert queue DB tree");
|
||||||
|
|
||||||
let gc_todo = db
|
let gc_todo = db
|
||||||
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
|
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
|
||||||
.expect("Unable to open DB tree");
|
.expect("Unable to open GC DB tree");
|
||||||
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
|
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
|
||||||
|
|
||||||
let metrics = TableMetrics::new(
|
let metrics = TableMetrics::new(
|
||||||
@ -74,6 +82,8 @@ where
|
|||||||
merkle_tree,
|
merkle_tree,
|
||||||
merkle_todo,
|
merkle_todo,
|
||||||
merkle_todo_notify: Notify::new(),
|
merkle_todo_notify: Notify::new(),
|
||||||
|
insert_queue,
|
||||||
|
insert_queue_notify: Notify::new(),
|
||||||
gc_todo,
|
gc_todo,
|
||||||
metrics,
|
metrics,
|
||||||
})
|
})
|
||||||
@ -306,6 +316,32 @@ where
|
|||||||
Ok(removed)
|
Ok(removed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- Insert queue functions ----
|
||||||
|
|
||||||
|
pub(crate) fn queue_insert(
|
||||||
|
&self,
|
||||||
|
tx: &mut db::Transaction,
|
||||||
|
ins: &F::E,
|
||||||
|
) -> db::TxResult<(), Error> {
|
||||||
|
let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
|
||||||
|
|
||||||
|
let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
|
||||||
|
Some(old_v) => {
|
||||||
|
let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
|
||||||
|
entry.merge(ins);
|
||||||
|
rmp_to_vec_all_named(&entry)
|
||||||
|
.map_err(Error::RmpEncode)
|
||||||
|
.map_err(db::TxError::Abort)?
|
||||||
|
}
|
||||||
|
None => rmp_to_vec_all_named(ins)
|
||||||
|
.map_err(Error::RmpEncode)
|
||||||
|
.map_err(db::TxError::Abort)?,
|
||||||
|
};
|
||||||
|
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// ---- Utility functions ----
|
// ---- Utility functions ----
|
||||||
|
|
||||||
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
|
||||||
|
@ -4,16 +4,18 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
mod metrics;
|
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
pub mod data;
|
pub mod data;
|
||||||
|
pub mod replication;
|
||||||
|
pub mod table;
|
||||||
|
|
||||||
mod gc;
|
mod gc;
|
||||||
mod merkle;
|
mod merkle;
|
||||||
pub mod replication;
|
mod metrics;
|
||||||
|
mod queue;
|
||||||
mod sync;
|
mod sync;
|
||||||
pub mod table;
|
|
||||||
|
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
pub use table::*;
|
pub use table::*;
|
||||||
|
@ -3,6 +3,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
@ -343,7 +344,10 @@ where
|
|||||||
if *must_exit.borrow() {
|
if *must_exit.borrow() {
|
||||||
return WorkerState::Done;
|
return WorkerState::Done;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
select! {
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(60)) => (),
|
||||||
|
_ = self.0.data.merkle_todo_notify.notified() => (),
|
||||||
|
}
|
||||||
WorkerState::Busy
|
WorkerState::Busy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
84
src/table/queue.rs
Normal file
84
src/table/queue.rs
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use tokio::select;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
use crate::replication::*;
|
||||||
|
use crate::schema::*;
|
||||||
|
use crate::table::*;
|
||||||
|
|
||||||
|
const BATCH_SIZE: usize = 100;
|
||||||
|
|
||||||
|
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
|
||||||
|
where
|
||||||
|
F: TableSchema + 'static,
|
||||||
|
R: TableReplication + 'static;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<F, R> Worker for InsertQueueWorker<F, R>
|
||||||
|
where
|
||||||
|
F: TableSchema + 'static,
|
||||||
|
R: TableReplication + 'static,
|
||||||
|
{
|
||||||
|
fn name(&self) -> String {
|
||||||
|
format!("{} queue", F::TABLE_NAME)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status(&self) -> WorkerStatus {
|
||||||
|
WorkerStatus {
|
||||||
|
queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||||||
|
let mut kv_pairs = vec![];
|
||||||
|
let mut values = vec![];
|
||||||
|
|
||||||
|
for entry_kv in self.0.data.insert_queue.iter()? {
|
||||||
|
let (k, v) = entry_kv?;
|
||||||
|
|
||||||
|
values.push(self.0.data.decode_entry(&v)?);
|
||||||
|
kv_pairs.push((k, v));
|
||||||
|
|
||||||
|
if kv_pairs.len() > BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if kv_pairs.is_empty() {
|
||||||
|
return Ok(WorkerState::Idle);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.0.insert_many(values).await?;
|
||||||
|
|
||||||
|
self.0.data.insert_queue.db().transaction(|mut tx| {
|
||||||
|
for (k, v) in kv_pairs.iter() {
|
||||||
|
if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
|
||||||
|
if &v2 == v {
|
||||||
|
tx.remove(&self.0.data.insert_queue, k)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(WorkerState::Busy)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
|
||||||
|
if *must_exit.borrow() {
|
||||||
|
return WorkerState::Done;
|
||||||
|
}
|
||||||
|
select! {
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(600)) => (),
|
||||||
|
_ = self.0.data.insert_queue_notify.notified() => (),
|
||||||
|
}
|
||||||
|
WorkerState::Busy
|
||||||
|
}
|
||||||
|
}
|
@ -25,6 +25,7 @@ use crate::crdt::Crdt;
|
|||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
use crate::gc::*;
|
use crate::gc::*;
|
||||||
use crate::merkle::*;
|
use crate::merkle::*;
|
||||||
|
use crate::queue::InsertQueueWorker;
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
use crate::schema::*;
|
use crate::schema::*;
|
||||||
use crate::sync::*;
|
use crate::sync::*;
|
||||||
@ -88,6 +89,11 @@ where
|
|||||||
endpoint,
|
endpoint,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
table
|
||||||
|
.system
|
||||||
|
.background
|
||||||
|
.spawn_worker(InsertQueueWorker(table.clone()));
|
||||||
|
|
||||||
table.endpoint.set_handler(table.clone());
|
table.endpoint.set_handler(table.clone());
|
||||||
|
|
||||||
table
|
table
|
||||||
@ -128,6 +134,11 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Insert item locally
|
||||||
|
pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
|
||||||
|
self.data.queue_insert(tx, e)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
|
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = IE> + Send + Sync,
|
I: IntoIterator<Item = IE> + Send + Sync,
|
||||||
|
Loading…
Reference in New Issue
Block a user