Compare commits
3 Commits
main
...
k2v-watch-
Author | SHA1 | Date |
---|---|---|
Alex Auvolat | 32715d462e | 1 year ago |
Alex Auvolat | b337895fce | 1 year ago |
Alex Auvolat | 49b5d18554 | 1 year ago |
@ -0,0 +1,107 @@ |
||||
use std::sync::Arc; |
||||
|
||||
use garage_db as db; |
||||
|
||||
use garage_table::crdt::*; |
||||
use garage_table::*; |
||||
|
||||
use crate::k2v::sub::*; |
||||
|
||||
mod v08 { |
||||
use crate::k2v::causality::K2VNodeId; |
||||
pub use crate::k2v::item_table::v08::{DvvsValue, K2VItem, K2VItemPartition}; |
||||
use garage_util::crdt; |
||||
use serde::{Deserialize, Serialize}; |
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] |
||||
pub struct K2VHistoryEntry { |
||||
// Partition key: the partition key of ins_item
|
||||
|
||||
/// The inserted item
|
||||
pub ins_item: K2VItem, |
||||
|
||||
/// Sort key: the node ID and its local counter
|
||||
pub node_counter: K2VHistorySortKey, |
||||
|
||||
/// The value of the node's local counter before this entry was updated
|
||||
pub prev_counter: u64, |
||||
/// The timesamp of the update (!= counter, counters are incremented
|
||||
/// by one, timestamps are real clock timestamps)
|
||||
pub timestamp: u64, |
||||
|
||||
/// Mark this history entry for deletion
|
||||
pub deleted: crdt::Bool, |
||||
} |
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] |
||||
pub struct K2VHistorySortKey { |
||||
pub node: K2VNodeId, |
||||
pub counter: u64, |
||||
} |
||||
|
||||
impl garage_util::migrate::InitialFormat for K2VHistoryEntry { |
||||
const VERSION_MARKER: &'static [u8] = b"Gk2vhe08"; |
||||
} |
||||
} |
||||
|
||||
pub use v08::*; |
||||
|
||||
impl Crdt for K2VHistoryEntry { |
||||
fn merge(&mut self, other: &Self) { |
||||
self.ins_item.merge(&other.ins_item); |
||||
self.deleted.merge(&other.deleted); |
||||
} |
||||
} |
||||
|
||||
impl SortKey for K2VHistorySortKey { |
||||
type B<'a> = [u8; 16]; |
||||
|
||||
fn sort_key(&self) -> [u8; 16] { |
||||
let mut ret = [0u8; 16]; |
||||
ret[0..8].copy_from_slice(&u64::to_be_bytes(self.node)); |
||||
ret[8..16].copy_from_slice(&u64::to_be_bytes(self.counter)); |
||||
ret |
||||
} |
||||
} |
||||
|
||||
impl Entry<K2VItemPartition, K2VHistorySortKey> for K2VHistoryEntry { |
||||
fn partition_key(&self) -> &K2VItemPartition { |
||||
&self.ins_item.partition |
||||
} |
||||
fn sort_key(&self) -> &K2VHistorySortKey { |
||||
&self.node_counter |
||||
} |
||||
fn is_tombstone(&self) -> bool { |
||||
self.deleted.get() |
||||
} |
||||
} |
||||
|
||||
pub struct K2VHistoryTable { |
||||
pub(crate) subscriptions: Arc<SubscriptionManager>, |
||||
} |
||||
|
||||
impl TableSchema for K2VHistoryTable { |
||||
const TABLE_NAME: &'static str = "k2v_history"; |
||||
|
||||
type P = K2VItemPartition; |
||||
type S = K2VHistorySortKey; |
||||
type E = K2VHistoryEntry; |
||||
type Filter = DeletedFilter; |
||||
|
||||
fn updated( |
||||
&self, |
||||
_tx: &mut db::Transaction, |
||||
_old: Option<&Self::E>, |
||||
new: Option<&Self::E>, |
||||
) -> db::TxOpResult<()> { |
||||
if let Some(new_ent) = new { |
||||
self.subscriptions.notify_range(new_ent); |
||||
} |
||||
|
||||
Ok(()) |
||||
} |
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { |
||||
filter.apply(entry.deleted.get()) |
||||
} |
||||
} |
@ -1,6 +1,8 @@ |
||||
pub mod causality; |
||||
|
||||
pub mod history_table; |
||||
pub mod item_table; |
||||
|
||||
pub mod poll; |
||||
pub(crate) mod sub; |
||||
|
||||
pub mod rpc; |
||||
|
@ -1,50 +0,0 @@ |
||||
use std::collections::HashMap; |
||||
use std::sync::Mutex; |
||||
|
||||
use serde::{Deserialize, Serialize}; |
||||
use tokio::sync::broadcast; |
||||
|
||||
use crate::k2v::item_table::*; |
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] |
||||
pub struct PollKey { |
||||
pub partition: K2VItemPartition, |
||||
pub sort_key: String, |
||||
} |
||||
|
||||
#[derive(Default)] |
||||
pub struct SubscriptionManager { |
||||
subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, |
||||
} |
||||
|
||||
impl SubscriptionManager { |
||||
pub fn new() -> Self { |
||||
Self::default() |
||||
} |
||||
|
||||
pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { |
||||
let mut subs = self.subscriptions.lock().unwrap(); |
||||
if let Some(s) = subs.get(key) { |
||||
s.subscribe() |
||||
} else { |
||||
let (tx, rx) = broadcast::channel(8); |
||||
subs.insert(key.clone(), tx); |
||||
rx |
||||
} |
||||
} |
||||
|
||||
pub fn notify(&self, item: &K2VItem) { |
||||
let key = PollKey { |
||||
partition: item.partition.clone(), |
||||
sort_key: item.sort_key.clone(), |
||||
}; |
||||
let mut subs = self.subscriptions.lock().unwrap(); |
||||
if let Some(s) = subs.get(&key) { |
||||
if s.send(item.clone()).is_err() { |
||||
// no more subscribers, remove channel from here
|
||||
// (we will re-create it later if we need to subscribe again)
|
||||
subs.remove(&key); |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,115 @@ |
||||
use std::collections::HashMap; |
||||
use std::sync::Mutex; |
||||
|
||||
use serde::{Deserialize, Serialize}; |
||||
use tokio::sync::broadcast; |
||||
|
||||
use crate::k2v::history_table::*; |
||||
use crate::k2v::item_table::*; |
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] |
||||
pub struct PollKey { |
||||
pub partition: K2VItemPartition, |
||||
pub sort_key: String, |
||||
} |
||||
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] |
||||
pub struct PollRange { |
||||
pub partition: K2VItemPartition, |
||||
pub prefix: Option<String>, |
||||
pub start: Option<String>, |
||||
pub end: Option<String>, |
||||
} |
||||
|
||||
#[derive(Default)] |
||||
pub struct SubscriptionManager { |
||||
item_subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, |
||||
range_subscriptions: Mutex<HashMap<PollRange, broadcast::Sender<K2VHistoryEntry>>>, |
||||
} |
||||
|
||||
impl SubscriptionManager { |
||||
pub fn new() -> Self { |
||||
Self::default() |
||||
} |
||||
|
||||
// ---- simple item polling ----
|
||||
|
||||
pub fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { |
||||
let mut subs = self.item_subscriptions.lock().unwrap(); |
||||
if let Some(s) = subs.get(key) { |
||||
s.subscribe() |
||||
} else { |
||||
let (tx, rx) = broadcast::channel(8); |
||||
subs.insert(key.clone(), tx); |
||||
rx |
||||
} |
||||
} |
||||
|
||||
pub fn notify_item(&self, item: &K2VItem) { |
||||
let key = PollKey { |
||||
partition: item.partition.clone(), |
||||
sort_key: item.sort_key.clone(), |
||||
}; |
||||
let mut subs = self.item_subscriptions.lock().unwrap(); |
||||
if let Some(s) = subs.get(&key) { |
||||
if s.send(item.clone()).is_err() { |
||||
// no more subscribers, remove channel from here
|
||||
// (we will re-create it later if we need to subscribe again)
|
||||
subs.remove(&key); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// ---- range polling ----
|
||||
|
||||
pub fn subscribe_range(&self, key: &PollRange) -> broadcast::Receiver<K2VHistoryEntry> { |
||||
let mut subs = self.range_subscriptions.lock().unwrap(); |
||||
if let Some(s) = subs.get(key) { |
||||
s.subscribe() |
||||
} else { |
||||
let (tx, rx) = broadcast::channel(8); |
||||
subs.insert(key.clone(), tx); |
||||
rx |
||||
} |
||||
} |
||||
|
||||
pub fn notify_range(&self, entry: &K2VHistoryEntry) { |
||||
let mut subs = self.range_subscriptions.lock().unwrap(); |
||||
let mut dead_subs = vec![]; |
||||
|
||||
for (sub, chan) in subs.iter() { |
||||
if sub.matches(&entry) { |
||||
if chan.send(entry.clone()).is_err() { |
||||
dead_subs.push(sub.clone()); |
||||
} |
||||
} else if chan.receiver_count() == 0 { |
||||
dead_subs.push(sub.clone()); |
||||
} |
||||
} |
||||
|
||||
for sub in dead_subs.iter() { |
||||
subs.remove(sub); |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl PollRange { |
||||
fn matches(&self, entry: &K2VHistoryEntry) -> bool { |
||||
entry.ins_item.partition == self.partition |
||||
&& self |
||||
.prefix |
||||
.as_ref() |
||||
.map(|x| entry.ins_item.sort_key.starts_with(x)) |
||||
.unwrap_or(true) |
||||
&& self |
||||
.start |
||||
.as_ref() |
||||
.map(|x| entry.ins_item.sort_key >= *x) |
||||
.unwrap_or(true) |
||||
&& self |
||||
.end |
||||
.as_ref() |
||||
.map(|x| entry.ins_item.sort_key < *x) |
||||
.unwrap_or(true) |
||||
} |
||||
} |
Loading…
Reference in new issue