From 4ef84a0558c0bf6641094e762ede0c962781204d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 23 Apr 2020 18:36:12 +0000 Subject: [PATCH] Move repair to separate file --- src/admin_rpc.rs | 182 +++------------------------------------- src/main.rs | 28 +++++++ src/store/key_table.rs | 3 +- src/store/mod.rs | 1 + src/store/repair.rs | 184 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 226 insertions(+), 172 deletions(-) create mode 100644 src/store/repair.rs diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 17ef5072..e4afc689 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::watch; use crate::data::*; use crate::error::Error; @@ -12,9 +11,8 @@ use crate::table::*; use crate::rpc::rpc_client::*; use crate::rpc::rpc_server::*; -use crate::store::block_ref_table::*; use crate::store::bucket_table::*; -use crate::store::version_table::*; +use crate::store::repair::Repair; use crate::*; @@ -24,6 +22,7 @@ pub const ADMIN_RPC_PATH: &str = "_admin"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRPC { BucketOperation(BucketOperation), + KeyOperation(KeyOperation), LaunchRepair(RepairOpt), // Replies @@ -51,6 +50,7 @@ impl AdminRpcHandler { async move { match msg { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, _ => Err(Error::BadRequest(format!("Invalid RPC"))), } @@ -154,6 +154,10 @@ impl AdminRpcHandler { } } + async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result { + Err(Error::Message(format!("Not implemented"))) + } + async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { if !opt.yes { return Err(Error::BadRequest(format!( @@ -189,12 +193,14 @@ impl AdminRpcHandler { ))) } } else { - let self2 = self.clone(); + let repair = Repair { + garage: self.garage.clone(), + }; self.garage .system .background .spawn_worker("Repair worker".into(), move |must_exit| async move { - self2.repair_worker(opt, must_exit).await + repair.repair_worker(opt, must_exit).await }) .await; Ok(AdminRPC::Ok(format!( @@ -203,170 +209,4 @@ impl AdminRpcHandler { ))) } } - - async fn repair_worker( - self: Arc, - opt: RepairOpt, - must_exit: watch::Receiver, - ) -> Result<(), Error> { - let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); - - if todo(RepairWhat::Tables) { - info!("Launching a full sync of tables"); - self.garage - .bucket_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .object_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .version_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - self.garage - .block_ref_table - .syncer - .load_full() - .unwrap() - .add_full_scan() - .await; - } - - // TODO: wait for full sync to finish before proceeding to the rest? - - if todo(RepairWhat::Versions) { - info!("Repairing the versions table"); - self.repair_versions(&must_exit).await?; - } - - if todo(RepairWhat::BlockRefs) { - info!("Repairing the block refs table"); - self.repair_block_ref(&must_exit).await?; - } - - if opt.what.is_none() { - info!("Repairing the RC"); - self.repair_rc(&must_exit).await?; - } - - if todo(RepairWhat::Blocks) { - info!("Repairing the stored blocks"); - self.garage - .block_manager - .repair_data_store(&must_exit) - .await?; - } - - Ok(()) - } - - async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; - if version.deleted { - continue; - } - let object = self - .garage - .object_table - .get(&version.bucket, &version.key) - .await?; - let version_exists = match object { - Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), - None => { - warn!( - "Repair versions: object for version {:?} not found", - version - ); - false - } - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket, - version.key, - true, - vec![], - )) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { - let mut pos = vec![]; - - while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { - pos = item_key.to_vec(); - - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; - if block_ref.deleted { - continue; - } - let version = self - .garage - .version_table - .get(&block_ref.version, &EmptyKey) - .await?; - let ref_exists = match version { - Some(v) => !v.deleted, - None => { - warn!( - "Block ref repair: version for block ref {:?} not found", - block_ref - ); - false - } - }; - if !ref_exists { - info!( - "Repair block ref: marking block_ref as deleted: {:?}", - block_ref - ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true, - }) - .await?; - } - - if *must_exit.borrow() { - break; - } - } - Ok(()) - } - - async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { - // TODO - warn!("repair_rc: not implemented"); - Ok(()) - } } diff --git a/src/main.rs b/src/main.rs index c693b12c..cf3a4130 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,6 +70,10 @@ pub enum Command { #[structopt(name = "bucket")] Bucket(BucketOperation), + /// Key operations + #[structopt(name = "key")] + Key(KeyOperation), + /// Start repair of node data #[structopt(name = "repair")] Repair(RepairOpt), @@ -182,6 +186,27 @@ pub struct PermBucketOpt { pub bucket: String, } +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum KeyOperation { + /// List keys + #[structopt(name = "list")] + List, + + /// Create new key + #[structopt(name = "new")] + New, + + /// Delete key + #[structopt(name = "delete")] + Delete(KeyDeleteOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyDeleteOpt { + /// Name of the bucket to delete + bucket: String, +} + #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes @@ -257,6 +282,9 @@ async fn main() { Command::Bucket(bo) => { cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await } + Command::Key(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::KeyOperation(bo)).await + } Command::Repair(ro) => { cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await } diff --git a/src/store/key_table.rs b/src/store/key_table.rs index 7476622f..6c3f96d6 100644 --- a/src/store/key_table.rs +++ b/src/store/key_table.rs @@ -30,7 +30,8 @@ impl Key { authorized_buckets: vec![], }; for b in buckets { - ret.add_bucket(b); + ret.add_bucket(b) + .expect("Duplicate AllowedBucket in Key constructor"); } ret } diff --git a/src/store/mod.rs b/src/store/mod.rs index b6a8dc46..962264c4 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -3,4 +3,5 @@ pub mod block_ref_table; pub mod bucket_table; pub mod key_table; pub mod object_table; +pub mod repair; pub mod version_table; diff --git a/src/store/repair.rs b/src/store/repair.rs new file mode 100644 index 00000000..39c57fc1 --- /dev/null +++ b/src/store/repair.rs @@ -0,0 +1,184 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use crate::error::Error; +use crate::server::Garage; +use crate::table::*; + +use crate::store::block_ref_table::*; +use crate::store::version_table::*; + +use crate::*; + +pub struct Repair { + pub garage: Arc, +} + +impl Repair { + pub async fn repair_worker( + &self, + opt: RepairOpt, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); + + if todo(RepairWhat::Tables) { + info!("Launching a full sync of tables"); + self.garage + .bucket_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .object_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .version_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .block_ref_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + } + + // TODO: wait for full sync to finish before proceeding to the rest? + + if todo(RepairWhat::Versions) { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + + if todo(RepairWhat::BlockRefs) { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + + if opt.what.is_none() { + info!("Repairing the RC"); + self.repair_rc(&must_exit).await?; + } + + if todo(RepairWhat::Blocks) { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + if version.deleted { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket, &version.key) + .await?; + let version_exists = match object { + Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), + None => { + warn!( + "Repair versions: object for version {:?} not found", + version + ); + false + } + }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + self.garage + .version_table + .insert(&Version::new( + version.uuid, + version.bucket, + version.key, + true, + vec![], + )) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + if block_ref.deleted { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + let ref_exists = match version { + Some(v) => !v.deleted, + None => { + warn!( + "Block ref repair: version for block ref {:?} not found", + block_ref + ); + false + } + }; + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_rc(&self, _must_exit: &watch::Receiver) -> Result<(), Error> { + // TODO + warn!("repair_rc: not implemented"); + Ok(()) + } +}