Table range deletion
This commit is contained in:
parent
6ce14e2c9e
commit
867646093b
@ -45,20 +45,21 @@ impl TableSchema for BlockRefTable {
|
|||||||
type S = UUID;
|
type S = UUID;
|
||||||
type E = BlockRef;
|
type E = BlockRef;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let was_before = old.map(|x| !x.deleted).unwrap_or(false);
|
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
|
||||||
let is_after = !new.deleted;
|
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
|
||||||
|
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
|
||||||
if is_after && !was_before {
|
if is_after && !was_before {
|
||||||
if let Err(e) = self.block_manager.block_incref(&new.block) {
|
if let Err(e) = self.block_manager.block_incref(block) {
|
||||||
eprintln!("Failed to incref block {:?}: {}", &new.block, e);
|
eprintln!("Failed to incref block {:?}: {}", block, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if was_before && !is_after {
|
if was_before && !is_after {
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.block_manager
|
.block_manager
|
||||||
.block_decref(&new.block, &self.background)
|
.block_decref(block, &self.background)
|
||||||
{
|
{
|
||||||
eprintln!("Failed to decref block {:?}: {}", &new.block, e);
|
eprintln!("Failed to decref block {:?}: {}", block, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,13 +97,13 @@ impl TableSchema for ObjectTable {
|
|||||||
type S = String;
|
type S = String;
|
||||||
type E = Object;
|
type E = Object;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let version_table = self.version_table.clone();
|
let version_table = self.version_table.clone();
|
||||||
self.background.spawn(async move {
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
// Propagate deletion of old versions
|
// Propagate deletion of old versions
|
||||||
if let Some(old_v) = old {
|
self.background.spawn(async move {
|
||||||
for v in old_v.versions.iter() {
|
for v in old_v.versions.iter() {
|
||||||
if new
|
if new_v
|
||||||
.versions
|
.versions
|
||||||
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
|
||||||
.is_err()
|
.is_err()
|
||||||
@ -118,8 +118,8 @@ impl TableSchema for ObjectTable {
|
|||||||
version_table.insert(&deleted_version).await?;
|
version_table.insert(&deleted_version).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
Ok(())
|
||||||
Ok(())
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
19
src/table.rs
19
src/table.rs
@ -119,7 +119,7 @@ pub trait TableSchema: Send + Sync {
|
|||||||
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
type E: Entry<Self::P, Self::S>;
|
type E: Entry<Self::P, Self::S>;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E);
|
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: TableSchema + 'static> Table<F> {
|
impl<F: TableSchema + 'static> Table<F> {
|
||||||
@ -370,10 +370,10 @@ impl<F: TableSchema + 'static> Table<F> {
|
|||||||
.map_err(Error::RMPEncode)
|
.map_err(Error::RMPEncode)
|
||||||
.map_err(sled::ConflictableTransactionError::Abort)?;
|
.map_err(sled::ConflictableTransactionError::Abort)?;
|
||||||
db.insert(tree_key.clone(), new_bytes)?;
|
db.insert(tree_key.clone(), new_bytes)?;
|
||||||
Ok((old_entry, new_entry))
|
Ok((old_entry, Some(new_entry)))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if old_entry.as_ref() != Some(&new_entry) {
|
if old_entry != new_entry {
|
||||||
self.instance.updated(old_entry, new_entry).await;
|
self.instance.updated(old_entry, new_entry).await;
|
||||||
|
|
||||||
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
|
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
|
||||||
@ -385,7 +385,18 @@ impl<F: TableSchema + 'static> Table<F> {
|
|||||||
|
|
||||||
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
|
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
|
||||||
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
|
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
|
||||||
// TODO
|
let mut count = 0;
|
||||||
|
while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
|
||||||
|
if key.as_ref() < begin.as_slice() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if let Some(old_val) = self.store.remove(&key)? {
|
||||||
|
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
|
||||||
|
self.instance.updated(Some(old_entry), None).await;
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
eprintln!("({}) {} entries deleted", self.name, count);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,6 +346,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
|
||||||
|
eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums);
|
||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,12 +63,12 @@ impl TableSchema for VersionTable {
|
|||||||
type S = EmptySortKey;
|
type S = EmptySortKey;
|
||||||
type E = Version;
|
type E = Version;
|
||||||
|
|
||||||
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
|
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let block_ref_table = self.block_ref_table.clone();
|
let block_ref_table = self.block_ref_table.clone();
|
||||||
self.background.spawn(async move {
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
// Propagate deletion of version blocks
|
// Propagate deletion of version blocks
|
||||||
if let Some(old_v) = old {
|
self.background.spawn(async move {
|
||||||
if new.deleted && !old_v.deleted {
|
if new_v.deleted && !old_v.deleted {
|
||||||
let deleted_block_refs = old_v
|
let deleted_block_refs = old_v
|
||||||
.blocks
|
.blocks
|
||||||
.iter()
|
.iter()
|
||||||
@ -80,8 +80,8 @@ impl TableSchema for VersionTable {
|
|||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
|
||||||
}
|
}
|
||||||
}
|
Ok(())
|
||||||
Ok(())
|
});
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user