Small refactor of tables internals
This commit is contained in:
parent
d1279e04f3
commit
f8e528c15d
@ -251,13 +251,9 @@ impl<T: CountedItem> IndexCounter<T> {
|
|||||||
TR: TableReplication,
|
TR: TableReplication,
|
||||||
{
|
{
|
||||||
let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
|
let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
|
||||||
let entry_k = self
|
|
||||||
.table
|
|
||||||
.data
|
|
||||||
.tree_key(entry.partition_key(), entry.sort_key());
|
|
||||||
self.table
|
self.table
|
||||||
.data
|
.data
|
||||||
.update_entry_with(&entry_k, |ent| match ent {
|
.update_entry_with(&entry.partition_key(), &entry.sort_key(), |ent| match ent {
|
||||||
Some(mut ent) => {
|
Some(mut ent) => {
|
||||||
ent.merge(&entry);
|
ent.merge(&entry);
|
||||||
ent
|
ent
|
||||||
|
@ -273,14 +273,9 @@ impl K2VRpcHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
|
||||||
let tree_key = self
|
|
||||||
.item_table
|
|
||||||
.data
|
|
||||||
.tree_key(&item.partition, &item.sort_key);
|
|
||||||
|
|
||||||
self.item_table
|
self.item_table
|
||||||
.data
|
.data
|
||||||
.update_entry_with(&tree_key[..], |ent| {
|
.update_entry_with(&item.partition, &item.sort_key, |ent| {
|
||||||
let mut ent = ent.unwrap_or_else(|| {
|
let mut ent = ent.unwrap_or_else(|| {
|
||||||
K2VItem::new(
|
K2VItem::new(
|
||||||
item.partition.bucket_id,
|
item.partition.bucket_id,
|
||||||
|
@ -173,9 +173,8 @@ where
|
|||||||
|
|
||||||
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
|
||||||
let update = self.decode_entry(update_bytes)?;
|
let update = self.decode_entry(update_bytes)?;
|
||||||
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
|
||||||
|
|
||||||
self.update_entry_with(&tree_key[..], |ent| match ent {
|
self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
|
||||||
Some(mut ent) => {
|
Some(mut ent) => {
|
||||||
ent.merge(&update);
|
ent.merge(&update);
|
||||||
ent
|
ent
|
||||||
@ -187,11 +186,14 @@ where
|
|||||||
|
|
||||||
pub fn update_entry_with(
|
pub fn update_entry_with(
|
||||||
&self,
|
&self,
|
||||||
tree_key: &[u8],
|
partition_key: &F::P,
|
||||||
|
sort_key: &F::S,
|
||||||
f: impl Fn(Option<F::E>) -> F::E,
|
f: impl Fn(Option<F::E>) -> F::E,
|
||||||
) -> Result<Option<F::E>, Error> {
|
) -> Result<Option<F::E>, Error> {
|
||||||
|
let tree_key = self.tree_key(partition_key, sort_key);
|
||||||
|
|
||||||
let changed = self.store.db().transaction(|mut tx| {
|
let changed = self.store.db().transaction(|mut tx| {
|
||||||
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
|
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
|
||||||
Some(old_bytes) => {
|
Some(old_bytes) => {
|
||||||
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
|
||||||
let new_entry = f(Some(old_entry.clone()));
|
let new_entry = f(Some(old_entry.clone()));
|
||||||
@ -200,23 +202,23 @@ where
|
|||||||
None => (None, None, f(None)),
|
None => (None, None, f(None)),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Scenario 1: the value changed, so of course there is a change
|
// Changed can be true in two scenarios
|
||||||
let value_changed = Some(&new_entry) != old_entry.as_ref();
|
// Scenario 1: the actual represented value changed,
|
||||||
|
// so of course the messagepack encoding changed as well
|
||||||
// Scenario 2: the value didn't change but due to a migration in the
|
// Scenario 2: the value didn't change but due to a migration in the
|
||||||
// data format, the messagepack encoding changed. In this case
|
// data format, the messagepack encoding changed. In this case,
|
||||||
// we have to write the migrated value in the table and update
|
// we also have to write the migrated value in the table and update
|
||||||
// the associated Merkle tree entry.
|
// the associated Merkle tree entry.
|
||||||
let new_bytes = rmp_to_vec_all_named(&new_entry)
|
let new_bytes = rmp_to_vec_all_named(&new_entry)
|
||||||
.map_err(Error::RmpEncode)
|
.map_err(Error::RmpEncode)
|
||||||
.map_err(db::TxError::Abort)?;
|
.map_err(db::TxError::Abort)?;
|
||||||
let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
|
let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
|
||||||
drop(old_bytes);
|
drop(old_bytes);
|
||||||
|
|
||||||
if value_changed || encoding_changed {
|
if changed {
|
||||||
let new_bytes_hash = blake2sum(&new_bytes[..]);
|
let new_bytes_hash = blake2sum(&new_bytes);
|
||||||
tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
|
tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
|
||||||
tx.insert(&self.store, tree_key, new_bytes)?;
|
tx.insert(&self.store, &tree_key, new_bytes)?;
|
||||||
|
|
||||||
self.instance
|
self.instance
|
||||||
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
|
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
|
||||||
@ -242,7 +244,7 @@ where
|
|||||||
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
||||||
let nodes = self.replication.write_nodes(&pk_hash);
|
let nodes = self.replication.write_nodes(&pk_hash);
|
||||||
if nodes.first() == Some(&self.system.id) {
|
if nodes.first() == Some(&self.system.id) {
|
||||||
GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
|
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,10 +260,11 @@ where
|
|||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
||||||
Some(cur_v) if cur_v == v => {
|
Some(cur_v) if cur_v == v => {
|
||||||
|
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
|
||||||
|
|
||||||
tx.remove(&self.store, k)?;
|
tx.remove(&self.store, k)?;
|
||||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||||
|
|
||||||
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
|
|
||||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
@ -285,10 +288,11 @@ where
|
|||||||
.db()
|
.db()
|
||||||
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
.transaction(|mut tx| match tx.get(&self.store, k)? {
|
||||||
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
|
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
|
||||||
|
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
|
||||||
|
|
||||||
tx.remove(&self.store, k)?;
|
tx.remove(&self.store, k)?;
|
||||||
tx.insert(&self.merkle_todo, k, vec![])?;
|
tx.insert(&self.merkle_todo, k, vec![])?;
|
||||||
|
|
||||||
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
|
|
||||||
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
self.instance.updated(&mut tx, Some(&old_entry), None)?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user