From 65070f3c05775f6692f4a16b6a304991d0510301 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 15 Oct 2021 11:05:09 +0200 Subject: [PATCH] Improvements to CLI and various fixes for netapp version --- Cargo.lock | 279 ++-------------------------- src/api/Cargo.toml | 2 +- src/api/error.rs | 8 +- src/garage/admin_rpc.rs | 46 ++--- src/garage/cli.rs | 235 ++++++++++++----------- src/garage/main.rs | 3 +- src/garage/server.rs | 8 +- src/model/block.rs | 30 ++- src/model/garage.rs | 6 + src/rpc/Cargo.toml | 4 +- src/rpc/consul.rs | 30 ++- src/rpc/ring.rs | 36 +--- src/rpc/rpc_helper.rs | 53 +++--- src/rpc/system.rs | 223 +++++++++++++++------- src/table/gc.rs | 29 ++- src/table/replication/fullcopy.rs | 7 +- src/table/replication/parameters.rs | 5 +- src/table/replication/sharded.rs | 5 +- src/table/sync.rs | 36 ++-- src/table/table.rs | 35 ++-- src/util/config.rs | 24 +-- src/util/data.rs | 12 ++ src/util/error.rs | 84 +++++---- src/web/Cargo.toml | 2 +- src/web/error.rs | 4 +- 25 files changed, 528 insertions(+), 678 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ea61847..adb068bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,12 +83,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "bumpalo" -version = "3.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" - [[package]] name = "byteorder" version = "1.4.3" @@ -393,9 +387,9 @@ dependencies = [ "futures-util", "garage_api", "garage_model", - "garage_rpc 0.3.0", + "garage_rpc", "garage_table", - "garage_util 0.3.0", + "garage_util", "garage_web", "git-version", "hex", @@ -425,7 +419,7 @@ dependencies = [ "futures-util", "garage_model", "garage_table", - "garage_util 0.3.0", + "garage_util", "hex", "hmac", "http", @@ -451,9 +445,9 @@ dependencies = [ "async-trait", "futures", "futures-util", - "garage_rpc 0.3.0", + "garage_rpc", "garage_table", - "garage_util 0.3.0", + "garage_util", "hex", "log", "netapp", @@ -465,33 +459,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "garage_rpc" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48c182633cebe4abed9594afb14770fc45402513765d38a4b19659ae0ccb2a2f" -dependencies = [ - "arc-swap", - "bytes 1.1.0", - "futures", - "futures-util", - "garage_util 0.2.1", - "gethostname", - "hex", - "http", - "hyper", - "hyper-rustls", - "log", - "rmp-serde 0.15.5", - "rustls", - "serde", - "serde_json", - "tokio", - "tokio-rustls", - "tokio-stream", - "webpki", -] - [[package]] name = "garage_rpc" version = "0.3.0" @@ -501,8 +468,7 @@ dependencies = [ "bytes 1.1.0", "futures", "futures-util", - "garage_rpc 0.2.1", - "garage_util 0.3.0", + "garage_util", "gethostname", "hex", "hyper", @@ -525,8 +491,8 @@ dependencies = [ "bytes 1.1.0", "futures", "futures-util", - "garage_rpc 0.3.0", - "garage_util 0.3.0", + "garage_rpc", + "garage_util", "hexdump", "log", "rand", @@ -537,33 +503,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "garage_util" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aef76d3779e406a16fdcaffe8d86b8ae2943a549d2b33f2c20930838764464c0" -dependencies = [ - "blake2", - "chrono", - "err-derive 0.3.0", - "futures", - "hex", - "http", - "hyper", - "log", - "rand", - "rmp-serde 0.15.5", - "rustls", - "serde", - "serde_json", - "sha2", - "sled", - "tokio", - "toml", - "webpki", - "xxhash-rust", -] - [[package]] name = "garage_util" version = "0.3.0" @@ -597,7 +536,7 @@ dependencies = [ "garage_api", "garage_model", "garage_table", - "garage_util 0.3.0", + "garage_util", "http", "hyper", "idna", @@ -658,31 +597,6 @@ dependencies = [ "syn", ] -[[package]] -name = "h2" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c06815895acec637cd6ed6e9662c935b866d20a106f8361892893a7d9234964" -dependencies = [ - "bytes 1.1.0", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - -[[package]] -name = "hashbrown" -version = "0.11.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" - [[package]] name = "heck" version = "0.3.3" @@ -792,7 +706,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", @@ -806,21 +719,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.22.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" -dependencies = [ - "futures-util", - "hyper", - "log", - "rustls", - "tokio", - "tokio-rustls", - "webpki", -] - [[package]] name = "idna" version = "0.2.3" @@ -832,16 +730,6 @@ dependencies = [ "unicode-normalization", ] -[[package]] -name = "indexmap" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" -dependencies = [ - "autocfg", - "hashbrown", -] - [[package]] name = "instant" version = "0.1.11" @@ -863,19 +751,11 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" -[[package]] -name = "js-sys" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" -dependencies = [ - "wasm-bindgen", -] - [[package]] name = "kuska-handshake" version = "0.2.0" -source = "git+https://github.com/Alexis211/handshake?branch=tokio1.0#61bf144643b177797b2d16b9b2ffcfb648face00" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e33da4b69f23c2ece0b3e729d079cebdc2c0206e493e42f510f500ad81c631d5" dependencies = [ "futures", "hex", @@ -994,7 +874,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.3.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp#cfa64bc745969cfc3684a70b45d71128f8335460" +source = "git+https://git.deuxfleurs.fr/lx/netapp#de981aace0e47a1fa65b38212ac21d91e52f7c15" dependencies = [ "arc-swap", "async-trait", @@ -1263,21 +1143,6 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" -[[package]] -name = "ring" -version = "0.16.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" -dependencies = [ - "cc", - "libc", - "once_cell", - "spin", - "untrusted", - "web-sys", - "winapi", -] - [[package]] name = "rmp" version = "0.8.10" @@ -1319,19 +1184,6 @@ dependencies = [ "xmlparser", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct", - "webpki", -] - [[package]] name = "rustversion" version = "1.0.5" @@ -1359,16 +1211,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "serde" version = "1.0.130" @@ -1469,12 +1311,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - [[package]] name = "structopt" version = "0.3.23" @@ -1621,17 +1457,6 @@ dependencies = [ "syn", ] -[[package]] -name = "tokio-rustls" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" -dependencies = [ - "rustls", - "tokio", - "webpki", -] - [[package]] name = "tokio-stream" version = "0.1.7" @@ -1738,12 +1563,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" -[[package]] -name = "untrusted" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" - [[package]] name = "url" version = "2.2.2" @@ -1789,80 +1608,6 @@ version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" -[[package]] -name = "wasm-bindgen" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" -dependencies = [ - "cfg-if", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" -dependencies = [ - "bumpalo", - "lazy_static", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.78" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" - -[[package]] -name = "web-sys" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "winapi" version = "0.3.9" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 8b5118cd..f346c6cc 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -35,7 +35,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi http = "0.2" httpdate = "0.3" http-range = "0.1" -hyper = "0.14" +hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } percent-encoding = "2.1.0" roxmltree = "0.14" serde = { version = "1.0", features = ["derive"] } diff --git a/src/api/error.rs b/src/api/error.rs index 7d97366e..35fa404f 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -82,7 +82,9 @@ impl Error { match self { Error::NotFound => StatusCode::NOT_FOUND, Error::Forbidden(_) => StatusCode::FORBIDDEN, - Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE, + Error::InternalError( + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + ) => StatusCode::SERVICE_UNAVAILABLE, Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { StatusCode::INTERNAL_SERVER_ERROR } @@ -95,7 +97,9 @@ impl Error { Error::NotFound => "NoSuchKey", Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", - Error::InternalError(GarageError::Rpc(_)) => "ServiceUnavailable", + Error::InternalError( + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + ) => "ServiceUnavailable", Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", _ => "InvalidRequest", } diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index b9e57c40..339d5bdb 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -31,15 +31,14 @@ pub enum AdminRpc { // Replies Ok(String), - Error(String), BucketList(Vec), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), } -impl Message for AdminRpc { - type Response = AdminRpc; +impl Rpc for AdminRpc { + type Response = Result; } pub struct AdminRpcHandler { @@ -341,17 +340,20 @@ impl AdminRpcHandler { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { - let node = NodeID::from_slice(node.as_slice()).unwrap(); - if self + let node = (*node).into(); + let resp = self .endpoint .call( &node, &AdminRpc::LaunchRepair(opt_to_send.clone()), PRIO_NORMAL, ) - .await - .is_err() - { + .await; + let is_err = match resp { + Ok(Ok(_)) => false, + _ => true, + }; + if is_err { failures.push(node); } } @@ -386,17 +388,17 @@ impl AdminRpcHandler { let ring = self.garage.system.ring.borrow().clone(); for node in ring.config.members.keys() { - let node = NodeID::from_slice(node.as_slice()).unwrap(); - let mut opt = opt.clone(); opt.all_nodes = false; writeln!(&mut ret, "\n======================").unwrap(); writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); + + let node_id = (*node).into(); match self .endpoint - .call(&node, &AdminRpc::Stats(opt), PRIO_NORMAL) - .await + .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) + .await? { Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), @@ -486,9 +488,16 @@ impl AdminRpcHandler { .unwrap(); writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); } +} - async fn handle_rpc(self: &Arc, msg: &AdminRpc) -> Result { - match msg { +#[async_trait] +impl EndpointHandler for AdminRpcHandler { + async fn handle( + self: &Arc, + message: &AdminRpc, + _from: NodeID, + ) -> Result { + match message { AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, @@ -497,12 +506,3 @@ impl AdminRpcHandler { } } } - -#[async_trait] -impl EndpointHandler for AdminRpcHandler { - async fn handle(self: &Arc, message: &AdminRpc, _from: NodeID) -> AdminRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| AdminRpc::Error(format!("{}", e))) - } -} diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 91ec5ab2..940a5a85 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -1,5 +1,4 @@ -//use std::cmp::max; -//use std::collections::HashSet; +use std::collections::HashSet; use std::path::PathBuf; use serde::{Deserialize, Serialize}; @@ -7,7 +6,7 @@ use structopt::StructOpt; use garage_util::data::Uuid; use garage_util::error::Error; -//use garage_util::time::*; +use garage_util::time::*; use garage_rpc::ring::*; use garage_rpc::system::*; @@ -58,6 +57,10 @@ pub struct ServerOpt { #[derive(StructOpt, Debug)] pub enum NodeOperation { + /// Connect to Garage node that is currently isolated from the system + #[structopt(name = "connect")] + Connect(ConnectNodeOpt), + /// Configure Garage node #[structopt(name = "configure")] Configure(ConfigureNodeOpt), @@ -67,6 +70,13 @@ pub enum NodeOperation { Remove(RemoveNodeOpt), } +#[derive(StructOpt, Debug)] +pub struct ConnectNodeOpt { + /// Node public key and address, in the format: + /// `@:` + node: String, +} + #[derive(StructOpt, Debug)] pub struct ConfigureNodeOpt { /// Node to configure (prefix of hexadecimal node id) @@ -303,6 +313,9 @@ pub async fn cli_cmd( ) -> Result<(), Error> { match cmd { Command::Status => cmd_status(system_rpc_endpoint, rpc_host).await, + Command::Node(NodeOperation::Connect(connect_opt)) => { + cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await + } Command::Node(NodeOperation::Configure(configure_opt)) => { cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await } @@ -326,142 +339,96 @@ pub async fn cli_cmd( pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - println!("STATUS:"); - for node in status { - println!("{:?}", node); - } - println!("CONFIG: (v{})", config.version); - for (id, node) in config.members { - println!("{} {:?}", hex::encode(id.as_slice()), node); - } - - /* TODO - let (hostname_len, addr_len, tag_len, zone_len) = status - .iter() - .map(|(id, addr, _)| (addr, config.members.get(&adv.id))) - .map(|(addr, cfg)| { - ( - 8, - addr.to_string().len(), - cfg.map(|c| c.tag.len()).unwrap_or(0), - cfg.map(|c| c.zone.len()).unwrap_or(0), - ) - }) - .fold((0, 0, 0, 0), |(h, a, t, z), (mh, ma, mt, mz)| { - (max(h, mh), max(a, ma), max(t, mt), max(z, mz)) - }); - println!("Healthy nodes:"); - for (id, addr, _) in status.iter().filter(|(id, addr, is_up)| is_up) { + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + for adv in status.iter().filter(|adv| adv.is_up) { if let Some(cfg) = config.members.get(&adv.id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}", - id = id, - host = "", - addr = addr, + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - h_pad = " ".repeat(hostname_len - adv.state_info.hostname.len()), - a_pad = " ".repeat(addr_len - adv.addr.to_string().len()), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } else { - println!( - "{id:?}\t{h}{h_pad}\t{addr}{a_pad}\tUNCONFIGURED/REMOVED", - id = id, - h = "", - addr = addr, - h_pad = " ".repeat(hostname_len - "".len()), - a_pad = " ".repeat(addr_len - addr.to_string().len()), - ); + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\tUNCONFIGURED/REMOVED", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + )); } } + format_table(healthy_nodes); - let status_keys = status.iter().map(|(id, _, _)| id).collect::>(); - let failure_case_1 = status.iter().any(|(_, _, is_up)| !is_up); + let status_keys = status.iter().map(|adv| adv.id).collect::>(); + let failure_case_1 = status.iter().any(|adv| !adv.is_up); let failure_case_2 = config .members .iter() .any(|(id, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\nFailed nodes:"); - for (id, addr) in status.iter().filter(|(_, _, is_up)| !is_up) { - if let Some(cfg) = config.members.get(&id) { - println!( - "{id:?}\t{host}{h_pad}\t{addr}{a_pad}\t[{tag}]{t_pad}\t{zone}{z_pad}\t{capacity}\tlast seen: {last_seen}s ago", - id=id, - host="", - addr=addr, - tag=cfg.tag, - zone=cfg.zone, - capacity=cfg.capacity_string(), - last_seen=(now_msec() - 0) / 1000, - h_pad=" ".repeat(hostname_len - "".len()), - a_pad=" ".repeat(addr_len - addr.to_string().len()), - t_pad=" ".repeat(tag_len - cfg.tag.len()), - z_pad=" ".repeat(zone_len - cfg.zone.len()), - ); + let mut failed_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + for adv in status.iter().filter(|adv| !adv.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + failed_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}s ago", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tag = cfg.tag, + zone = cfg.zone, + capacity = cfg.capacity_string(), + last_seen = (now_msec() - 0) / 1000, + )); } } - let (tag_len, zone_len) = config - .members - .iter() - .filter(|(&id, _)| !status.iter().any(|(xid, _, _)| xid == id)) - .map(|(_, cfg)| (cfg.tag.len(), cfg.zone.len())) - .fold((0, 0), |(t, z), (mt, mz)| (max(t, mt), max(z, mz))); - for (id, cfg) in config.members.iter() { - if !status.iter().any(|(xid, _, _)| xid == *id) { - println!( - "{id:?}\t{tag}{t_pad}\t{zone}{z_pad}\t{capacity}\tnever seen", + if !status.iter().any(|adv| adv.id == *id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", id = id, tag = cfg.tag, zone = cfg.zone, capacity = cfg.capacity_string(), - t_pad = " ".repeat(tag_len - cfg.tag.len()), - z_pad = " ".repeat(zone_len - cfg.zone.len()), - ); + )); } } + format_table(failed_nodes); } - */ Ok(()) } -pub fn find_matching_node( - cand: impl std::iter::Iterator, - pattern: &str, -) -> Result { - let mut candidates = vec![]; - for c in cand { - if hex::encode(&c).starts_with(&pattern) { - candidates.push(c); +pub async fn cmd_connect( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: ConnectNodeOpt, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL).await?? { + SystemRpc::Ok => { + println!("Success."); + Ok(()) + } + r => { + Err(Error::BadRpc(format!("Unexpected response: {:?}", r))) } - } - if candidates.len() != 1 { - Err(Error::Message(format!( - "{} nodes match '{}'", - candidates.len(), - pattern, - ))) - } else { - Ok(candidates[0]) } } @@ -472,22 +439,17 @@ pub async fn cmd_configure( ) -> Result<(), Error> { let status = match rpc_cli .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await? + .await?? { SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let added_node = find_matching_node( - status - .iter() - .map(|(id, _, _)| Uuid::try_from(id.as_ref()).unwrap()), - &args.node_id, - )?; + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -544,7 +506,7 @@ pub async fn cmd_configure( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -555,7 +517,7 @@ pub async fn cmd_remove( ) -> Result<(), Error> { let mut config = match rpc_cli .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await? + .await?? { SystemRpc::AdvertiseConfig(cfg) => cfg, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), @@ -575,7 +537,7 @@ pub async fn cmd_remove( rpc_cli .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await?; + .await??; Ok(()) } @@ -584,7 +546,7 @@ pub async fn cmd_admin( rpc_host: NodeID, args: AdminRpc, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await? { + match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { AdminRpc::Ok(msg) => { println!("{}", msg); } @@ -613,6 +575,8 @@ pub async fn cmd_admin( Ok(()) } +// --- Utility functions ---- + fn print_key_info(key: &Key) { println!("Key name: {}", key.name.get()); println!("Key ID: {}", key.key_id); @@ -640,3 +604,54 @@ fn print_bucket_info(bucket: &Bucket) { } }; } + +fn format_table(data: Vec) { + let data = data + .iter() + .map(|s| s.split('\t').collect::>()) + .collect::>(); + + let columns = data.iter().map(|row| row.len()).fold(0, std::cmp::max); + let mut column_size = vec![0; columns]; + + let mut out = String::new(); + + for row in data.iter() { + for (i, col) in row.iter().enumerate() { + column_size[i] = std::cmp::max(column_size[i], col.chars().count()); + } + } + + for row in data.iter() { + for (col, col_len) in row[..row.len() - 1].iter().zip(column_size.iter()) { + out.push_str(col); + (0..col_len - col.chars().count() + 2).for_each(|_| out.push(' ')); + } + out.push_str(&row[row.len() - 1]); + out.push('\n'); + } + + print!("{}", out); +} + +pub fn find_matching_node( + cand: impl std::iter::Iterator, + pattern: &str, +) -> Result { + let mut candidates = vec![]; + for c in cand { + if hex::encode(&c).starts_with(&pattern) { + candidates.push(c); + } + } + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} nodes match '{}'", + candidates.len(), + pattern, + ))) + } else { + Ok(candidates[0]) + } +} + diff --git a/src/garage/main.rs b/src/garage/main.rs index 7fe791b8..543860ca 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -9,8 +9,6 @@ mod cli; mod repair; mod server; -use std::net::SocketAddr; - use structopt::StructOpt; use netapp::util::parse_peer_addr; @@ -43,6 +41,7 @@ struct Opt { #[tokio::main] async fn main() { pretty_env_logger::init(); + sodiumoxide::init().expect("Unable to init sodiumoxide"); let opt = Opt::from_args(); diff --git a/src/garage/server.rs b/src/garage/server.rs index 0edf3e2d..cd92d157 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -71,8 +71,14 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Remove RPC handlers for system to break reference cycles garage.system.netapp.drop_all_handlers(); - // Await for last parts to end + // Await for netapp RPC system to end run_system.await?; + + // Break last reference cycles so that stuff can terminate properly + garage.break_reference_cycles(); + drop(garage); + + // Await for all background tasks to end await_background_done.await?; info!("Cleaning up..."); diff --git a/src/model/block.rs b/src/model/block.rs index 5574b7f6..a1dcf776 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -38,7 +38,6 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { Ok, - Error(String), /// Message to ask for a block of data, by hash GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new @@ -61,8 +60,8 @@ pub struct PutBlockMessage { pub data: Vec, } -impl Message for BlockRpc { - type Response = BlockRpc; +impl Rpc for BlockRpc { + type Response = Result; } /// The block manager, handling block exchange between nodes, and block storage on local node @@ -117,15 +116,6 @@ impl BlockManager { block_manager } - async fn handle_rpc(self: Arc, msg: &BlockRpc) -> Result { - match msg { - BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - BlockRpc::GetBlock(h) => self.read_block(h).await, - BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), - } - } - pub fn spawn_background_worker(self: Arc) { // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this // launches only one worker with current value of BACKGROUND_WORKERS @@ -532,11 +522,17 @@ impl BlockManager { #[async_trait] impl EndpointHandler for BlockManager { - async fn handle(self: &Arc, message: &BlockRpc, _from: NodeID) -> BlockRpc { - self.clone() - .handle_rpc(message) - .await - .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e))) + async fn handle( + self: &Arc, + message: &BlockRpc, + _from: NodeID, + ) -> Result { + match message { + BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::GetBlock(h) => self.read_block(h).await, + BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), + _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), + } } } diff --git a/src/model/garage.rs b/src/model/garage.rs index d4ea6f55..482c4df7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -61,6 +61,7 @@ impl Garage { background.clone(), replication_mode.replication_factor(), config.rpc_bind_addr, + config.rpc_public_addr, config.bootstrap_peers.clone(), config.consul_host.clone(), config.consul_service_name.clone(), @@ -162,4 +163,9 @@ impl Garage { garage } + + /// Use this for shutdown + pub fn break_reference_cycles(&self) { + self.block_manager.garage.swap(None); + } } diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 1100c737..7886dadc 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,8 +15,6 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.3.0", path = "../util" } -garage_rpc_021 = { package = "garage_rpc", version = "0.2.1" } - arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" @@ -36,5 +34,5 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -hyper = "0.14" +hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 63051a6b..fca4f517 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use hyper::client::Client; @@ -5,20 +6,24 @@ use hyper::StatusCode; use hyper::{Body, Method, Request}; use serde::Deserialize; +use netapp::NodeID; + use garage_util::error::Error; -#[derive(Deserialize, Clone)] +#[derive(Deserialize, Clone, Debug)] struct ConsulEntry { #[serde(alias = "Address")] address: String, #[serde(alias = "ServicePort")] service_port: u16, + #[serde(alias = "NodeMeta")] + node_meta: HashMap, } pub async fn get_consul_nodes( consul_host: &str, consul_service_name: &str, -) -> Result, Error> { +) -> Result, Error> { let url = format!( "http://{}/v1/catalog/service/{}", consul_host, consul_service_name @@ -40,11 +45,22 @@ pub async fn get_consul_nodes( let mut ret = vec![]; for ent in entries { - let ip = ent - .address - .parse::() - .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?; - ret.push(SocketAddr::new(ip, ent.service_port)); + let ip = ent.address.parse::().ok(); + let pubkey = ent + .node_meta + .get("pubkey") + .map(|k| hex::decode(&k).ok()) + .flatten() + .map(|k| NodeID::from_slice(&k[..])) + .flatten(); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } } debug!("Got nodes from Consul: {:?}", ret); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 7cbab762..3cb0d233 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -3,8 +3,6 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use netapp::NodeID; - use serde::{Deserialize, Serialize}; use garage_util::data::*; @@ -40,31 +38,6 @@ impl NetworkConfig { version: 0, } } - - pub(crate) fn migrate_from_021(old: garage_rpc_021::ring::NetworkConfig) -> Self { - let members = old - .members - .into_iter() - .map(|(id, conf)| { - ( - Hash::try_from(id.as_slice()).unwrap(), - NetworkConfigEntry { - zone: conf.datacenter, - capacity: if conf.capacity == 0 { - None - } else { - Some(conf.capacity) - }, - tag: conf.tag, - }, - ) - }) - .collect(); - Self { - members, - version: old.version, - } - } } /// The overall configuration of one (possibly remote) node @@ -100,7 +73,7 @@ pub struct Ring { pub config: NetworkConfig, // Internal order of nodes used to make a more compact representation of the ring - nodes: Vec, + nodes: Vec, // The list of entries in the ring ring: Vec, @@ -262,11 +235,6 @@ impl Ring { }) .collect::>(); - let nodes = nodes - .iter() - .map(|id| NodeID::from_slice(id.as_slice()).unwrap()) - .collect::>(); - Self { replication_factor, config, @@ -298,7 +266,7 @@ impl Ring { } /// Walk the ring to find the n servers in which data should be replicated - pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec { + pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c9458ee6..9f735ab4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -8,13 +8,14 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; -use garage_util::error::{Error, RpcError}; +use garage_util::error::Error; +use garage_util::data::Uuid; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -66,46 +67,47 @@ pub struct RpcHelper { } impl RpcHelper { - pub async fn call( + pub async fn call( &self, endpoint: &Endpoint, - to: NodeID, + to: Uuid, msg: M, strat: RequestStrategy, - ) -> Result + ) -> Result where - M: Message, + M: Rpc>, H: EndpointHandler, { self.call_arc(endpoint, to, Arc::new(msg), strat).await } - pub async fn call_arc( + pub async fn call_arc( &self, endpoint: &Endpoint, - to: NodeID, + to: Uuid, msg: Arc, strat: RequestStrategy, - ) -> Result + ) -> Result where - M: Message, + M: Rpc>, H: EndpointHandler, { + let node_id = to.into(); select! { - res = endpoint.call(&to, &msg, strat.rs_priority) => Ok(res?), - _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Rpc(RpcError::Timeout)), + res = endpoint.call(&node_id, &msg, strat.rs_priority) => Ok(res??), + _ = tokio::time::sleep(strat.rs_timeout) => Err(Error::Timeout), } } - pub async fn call_many( + pub async fn call_many( &self, endpoint: &Endpoint, - to: &[NodeID], + to: &[Uuid], msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result)> + ) -> Vec<(Uuid, Result)> where - M: Message, + M: Rpc>, H: EndpointHandler, { let msg = Arc::new(msg); @@ -120,37 +122,38 @@ impl RpcHelper { .collect::>() } - pub async fn broadcast( + pub async fn broadcast( &self, endpoint: &Endpoint, msg: M, strat: RequestStrategy, - ) -> Vec<(NodeID, Result)> + ) -> Vec<(Uuid, Result)> where - M: Message, + M: Rpc>, H: EndpointHandler, { let to = self .fullmesh .get_peer_list() .iter() - .map(|p| p.id) + .map(|p| p.id.into()) .collect::>(); self.call_many(endpoint, &to[..], msg, strat).await } /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if /// strategy could not be respected due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, - to: &[NodeID], + to: &[Uuid], msg: M, strategy: RequestStrategy, - ) -> Result, Error> + ) -> Result, Error> where - M: Message + 'static, + M: Rpc> + 'static, H: EndpointHandler + 'static, + S: Send, { let msg = Arc::new(msg); let mut resp_stream = to @@ -200,7 +203,7 @@ impl RpcHelper { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - Err(Error::from(RpcError::TooManyErrors(errors))) + Err(Error::TooManyErrors(errors)) } } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 7ccec945..886811b1 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -1,8 +1,9 @@ //! Module containing structs related to membership management +use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use arc_swap::ArcSwap; @@ -14,21 +15,24 @@ use sodiumoxide::crypto::sign::ed25519; use tokio::sync::watch; use tokio::sync::Mutex; -use netapp::endpoint::{Endpoint, EndpointHandler, Message}; +use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::proto::*; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; +use netapp::util::parse_and_resolve_peer_addr; use garage_util::background::BackgroundRunner; +use garage_util::data::Uuid; use garage_util::error::Error; use garage_util::persister::Persister; -//use garage_util::time::*; +use garage_util::time::*; -//use crate::consul::get_consul_nodes; +use crate::consul::get_consul_nodes; use crate::ring::*; -use crate::rpc_helper::{RequestStrategy, RpcHelper}; +use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); +const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); /// RPC endpoint used for calls related to membership @@ -39,33 +43,34 @@ pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; pub enum SystemRpc { /// Response to successfull advertisements Ok, - /// Error response - Error(String), + /// Request to connect to a specific node (in @: format) + Connect(String), /// Ask other node its config. Answered with AdvertiseConfig PullConfig, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. - AdvertiseStatus(StateInfo), + AdvertiseStatus(NodeStatus), /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), /// Get known nodes states GetKnownNodes, /// Return known nodes - ReturnKnownNodes(Vec<(NodeID, SocketAddr, bool)>), + ReturnKnownNodes(Vec), } -impl Message for SystemRpc { - type Response = SystemRpc; +impl Rpc for SystemRpc { + type Response = Result; } /// This node's membership manager pub struct System { /// The id of this node - pub id: NodeID, + pub id: Uuid, persist_config: Persister, - state_info: ArcSwap, + local_status: ArcSwap, + node_status: RwLock>, pub netapp: Arc, fullmesh: Arc, @@ -88,7 +93,7 @@ pub struct System { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StateInfo { +pub struct NodeStatus { /// Hostname of the node pub hostname: String, /// Replication factor configured on the node @@ -97,26 +102,34 @@ pub struct StateInfo { pub config_version: u64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KnownNodeInfo { + pub id: Uuid, + pub addr: SocketAddr, + pub is_up: bool, + pub status: NodeStatus, +} + fn gen_node_key(metadata_dir: &Path) -> Result { - let mut id_file = metadata_dir.to_path_buf(); - id_file.push("node_id"); - if id_file.as_path().exists() { - let mut f = std::fs::File::open(id_file.as_path())?; + let mut key_file = metadata_dir.to_path_buf(); + key_file.push("node_key"); + if key_file.as_path().exists() { + let mut f = std::fs::File::open(key_file.as_path())?; let mut d = vec![]; f.read_to_end(&mut d)?; if d.len() != 64 { - return Err(Error::Message("Corrupt node_id file".to_string())); + return Err(Error::Message("Corrupt node_key file".to_string())); } let mut key = [0u8; 64]; key.copy_from_slice(&d[..]); Ok(NodeKey::from_slice(&key[..]).unwrap()) } else { - let (key, _) = ed25519::gen_keypair(); + let (_, key) = ed25519::gen_keypair(); - let mut f = std::fs::File::create(id_file.as_path())?; + let mut f = std::fs::File::create(key_file.as_path())?; f.write_all(&key[..])?; - Ok(NodeKey::from_slice(&key[..]).unwrap()) + Ok(key) } } @@ -128,6 +141,7 @@ impl System { background: Arc, replication_factor: usize, rpc_listen_addr: SocketAddr, + rpc_public_address: Option, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option, consul_service_name: Option, @@ -140,25 +154,15 @@ impl System { let net_config = match persist_config.load() { Ok(x) => x, Err(e) => { - match Persister::::new( - &metadata_dir, - "network_config", - ) - .load() - { - Ok(old_config) => NetworkConfig::migrate_from_021(old_config), - Err(e2) => { - info!( - "No valid previous network configuration stored ({}, {}), starting fresh.", - e, e2 - ); - NetworkConfig::new() - } - } + info!( + "No valid previous network configuration stored ({}), starting fresh.", + e + ); + NetworkConfig::new() } }; - let state_info = StateInfo { + let local_status = NodeStatus { hostname: gethostname::gethostname() .into_string() .unwrap_or_else(|_| "".to_string()), @@ -169,15 +173,26 @@ impl System { let ring = Ring::new(net_config, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); + if let Some(addr) = rpc_public_address { + println!("{}@{}", hex::encode(&node_key.public_key()), addr); + } else { + println!("{}", hex::encode(&node_key.public_key())); + } + let netapp = NetApp::new(network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers.clone()); + let fullmesh = FullMeshPeeringStrategy::new( + netapp.clone(), + bootstrap_peers.clone(), + rpc_public_address, + ); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let sys = Arc::new(System { - id: netapp.id.clone(), + id: netapp.id.into(), persist_config, - state_info: ArcSwap::new(Arc::new(state_info)), + local_status: ArcSwap::new(Arc::new(local_status)), + node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), rpc: RpcHelper { @@ -206,6 +221,7 @@ impl System { .listen(self.rpc_listen_addr, None, must_exit.clone()), self.fullmesh.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), + self.status_exchange_loop(must_exit.clone()), ); } @@ -221,12 +237,27 @@ impl System { Ok(()) } - fn update_state_info(&self) { - let mut new_si: StateInfo = self.state_info.load().as_ref().clone(); + fn update_local_status(&self) { + let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let ring = self.ring.borrow(); new_si.config_version = ring.config.version; - self.state_info.swap(Arc::new(new_si)); + self.local_status.swap(Arc::new(new_si)); + } + + async fn handle_connect(&self, node: &str) -> Result { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node) + .ok_or_else(|| Error::Message(format!("Unable to parse or resolve node specification: {}", node)))?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self.netapp.clone().try_connect(*ip, pubkey).await { + Ok(()) => return Ok(SystemRpc::Ok), + Err(e) => { + errors.push((*ip, e)); + } + } + } + return Err(Error::Message(format!("Could not connect to specified peers. Errors: {:?}", errors))); } fn handle_pull_config(&self) -> SystemRpc { @@ -234,6 +265,58 @@ impl System { SystemRpc::AdvertiseConfig(ring.config.clone()) } + fn handle_get_known_nodes(&self) -> SystemRpc { + let node_status = self.node_status.read().unwrap(); + let known_nodes = + self.fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + status: node_status.get(&n.id.into()).cloned().map(|(_, st)| st).unwrap_or( + NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + config_version: 0, + }, + ), + }) + .collect::>(); + SystemRpc::ReturnKnownNodes(known_nodes) + } + + async fn handle_advertise_status( + self: &Arc, + from: Uuid, + info: &NodeStatus, + ) -> Result { + let local_info = self.local_status.load(); + + if local_info.replication_factor < info.replication_factor { + error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs", + info.replication_factor, + local_info.replication_factor); + std::process::exit(1); + } + + if info.config_version > local_info.config_version { + let self2 = self.clone(); + self.background.spawn_cancellable(async move { + self2.pull_config(from).await; + Ok(()) + }); + } + + self.node_status + .write() + .unwrap() + .insert(from, (now_msec(), info.clone())); + + Ok(SystemRpc::Ok) + } + async fn handle_advertise_config( self: Arc, adv: &NetworkConfig, @@ -265,13 +348,32 @@ impl System { Ok(SystemRpc::Ok) } + async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver) { + while !*stop_signal.borrow() { + let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL); + + self.update_local_status(); + let local_status: NodeStatus = self.local_status.load().as_ref().clone(); + self.rpc + .broadcast( + &self.system_endpoint, + SystemRpc::AdvertiseStatus(local_status), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + ) + .await; + + select! { + _ = restart_at.fuse() => {}, + _ = stop_signal.changed().fuse() => {}, + } + } + } + async fn discovery_loop(&self, mut stop_signal: watch::Receiver) { - /* TODO let consul_config = match (&self.consul_host, &self.consul_service_name) { (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), _ => None, }; - */ while !*stop_signal.borrow() { let not_configured = self.ring.borrow().config.members.is_empty(); @@ -286,7 +388,7 @@ impl System { if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let ping_list = self.bootstrap_peers.clone(); + let mut ping_list = self.bootstrap_peers.clone(); /* *TODO bring this back: persisted list of peers @@ -295,19 +397,16 @@ impl System { } */ - /* - * TODO bring this back: get peers from consul if let Some((consul_host, consul_service_name)) = &consul_config { match get_consul_nodes(consul_host, consul_service_name).await { Ok(node_list) => { - ping_list.extend(node_list.iter().map(|a| (*a, None))); + ping_list.extend(node_list); } Err(e) => { warn!("Could not retrieve node list from Consul: {}", e); } } } - */ for (node_id, node_addr) in ping_list { tokio::spawn(self.netapp.clone().try_connect(node_addr, node_id)); @@ -322,7 +421,7 @@ impl System { } } - async fn pull_config(self: Arc, peer: NodeID) { + async fn pull_config(self: Arc, peer: Uuid) { let resp = self .rpc .call( @@ -340,24 +439,14 @@ impl System { #[async_trait] impl EndpointHandler for System { - async fn handle(self: &Arc, msg: &SystemRpc, _from: NodeID) -> SystemRpc { - let resp = match msg { + async fn handle(self: &Arc, msg: &SystemRpc, from: NodeID) -> Result { + match msg { + SystemRpc::Connect(node) => self.handle_connect(node).await, SystemRpc::PullConfig => Ok(self.handle_pull_config()), + SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(&adv).await, - SystemRpc::GetKnownNodes => { - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| (n.id, n.addr, n.is_up())) - .collect::>(); - Ok(SystemRpc::ReturnKnownNodes(known_nodes)) - } + SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), - }; - match resp { - Ok(r) => r, - Err(e) => SystemRpc::Error(format!("{}", e)), } } } diff --git a/src/table/gc.rs b/src/table/gc.rs index c03648ef..9b3d60ff 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -36,11 +36,10 @@ enum GcRpc { Update(Vec), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, - Error(String), } -impl Message for GcRpc { - type Response = GcRpc; +impl Rpc for GcRpc { + type Response = Result; } impl TableGc @@ -168,7 +167,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec, + nodes: Vec, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -224,8 +223,15 @@ where .compare_and_swap::<_, _, Vec>(key, Some(vhash), None)?; Ok(()) } +} - async fn handle_rpc(&self, message: &GcRpc) -> Result { +#[async_trait] +impl EndpointHandler for TableGc +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { GcRpc::Update(items) => { self.data.update_many(items)?; @@ -242,16 +248,3 @@ where } } } - -#[async_trait] -impl EndpointHandler for TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> GcRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| GcRpc::Error(format!("{}", e))) - } -} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index b41c5360..ae6851fb 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use garage_rpc::ring::*; use garage_rpc::system::System; -use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -20,19 +19,19 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - fn read_nodes(&self, _hash: &Hash) -> Vec { + fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec { + fn write_nodes(&self, _hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); ring.config .members .keys() - .map(|id| NodeID::from_slice(id.as_slice()).unwrap()) + .cloned() .collect::>() } fn write_quorum(&self) -> usize { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 7fdfce67..3740d947 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,5 +1,4 @@ use garage_rpc::ring::*; -use garage_rpc::NodeID; use garage_util::data::*; /// Trait to describe how a table shall be replicated @@ -8,12 +7,12 @@ pub trait TableReplication: Send + Sync { // To understand various replication methods /// Which nodes to send read requests to - fn read_nodes(&self, hash: &Hash) -> Vec; + fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec; + fn write_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index ffe686a5..75043a17 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use garage_rpc::ring::*; use garage_rpc::system::System; -use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -26,7 +25,7 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { - fn read_nodes(&self, hash: &Hash) -> Vec { + fn read_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } @@ -34,7 +33,7 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec { + fn write_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } diff --git a/src/table/sync.rs b/src/table/sync.rs index c5db0987..4fcdc528 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -45,11 +45,10 @@ pub(crate) enum SyncRpc { Node(MerkleNodeKey, MerkleNode), Items(Vec>), Ok, - Error(String), } -impl Message for SyncRpc { - type Response = SyncRpc; +impl Rpc for SyncRpc { + type Response = Result; } struct SyncTodo { @@ -305,7 +304,7 @@ where async fn offload_items( self: &Arc, items: &[(Vec, Arc)], - nodes: &[NodeID], + nodes: &[Uuid], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::>(); @@ -354,7 +353,7 @@ where async fn do_sync_with( self: Arc, partition: TodoPartition, - who: NodeID, + who: Uuid, must_exit: watch::Receiver, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -480,7 +479,7 @@ where Ok(()) } - async fn send_items(&self, who: NodeID, item_value_list: Vec>) -> Result<(), Error> { + async fn send_items(&self, who: Uuid, item_value_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -513,9 +512,17 @@ where ))) } } +} - // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc, message: &SyncRpc) -> Result { +// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== + +#[async_trait] +impl EndpointHandler for TableSyncer +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc, message: &SyncRpc, _from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -535,19 +542,6 @@ where } } -#[async_trait] -impl EndpointHandler for TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc, message: &SyncRpc, _from: NodeID) -> SyncRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e))) - } -} - impl SyncTodo { fn add_full_sync( &mut self, diff --git a/src/table/table.rs b/src/table/table.rs index ad263343..e1357471 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -34,7 +34,6 @@ pub struct Table { #[derive(Serialize, Deserialize)] pub(crate) enum TableRpc { Ok, - Error(String), ReadEntry(F::P, F::S), ReadEntryResponse(Option), @@ -45,8 +44,8 @@ pub(crate) enum TableRpc { Update(Vec>), } -impl Message for TableRpc { - type Response = TableRpc; +impl Rpc for TableRpc { + type Response = Result, Error>; } impl Table @@ -277,7 +276,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.system .rpc @@ -292,10 +291,19 @@ where .await?; Ok(()) } +} - // ====== RPC HANDLER ===== - // - async fn handle_rpc(self: &Arc, msg: &TableRpc) -> Result, Error> { +#[async_trait] +impl EndpointHandler> for Table +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle( + self: &Arc, + msg: &TableRpc, + _from: NodeID, + ) -> Result, Error> { match msg { TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; @@ -313,16 +321,3 @@ where } } } - -#[async_trait] -impl EndpointHandler> for Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc, msg: &TableRpc, _from: NodeID) -> TableRpc { - self.handle_rpc(msg) - .await - .unwrap_or_else(|e| TableRpc::::Error(format!("{}", e))) - } -} diff --git a/src/util/config.rs b/src/util/config.rs index ee153dfa..fe0a6fa8 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -7,6 +7,7 @@ use serde::de::Error as SerdeError; use serde::{de, Deserialize}; use netapp::NodeID; +use netapp::util::parse_and_resolve_peer_addr; use crate::error::Error; @@ -34,6 +35,8 @@ pub struct Config { /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, + /// Public IP address of this node + pub rpc_public_addr: Option, /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] @@ -111,26 +114,13 @@ fn deserialize_vec_addr<'de, D>(deserializer: D) -> Result, { - use std::net::ToSocketAddrs; - let mut ret = vec![]; for peer in >::deserialize(deserializer)? { - let delim = peer - .find('@') - .ok_or_else(|| D::Error::custom("Invalid bootstrap peer: public key not specified"))?; - let (key, host) = peer.split_at(delim); - let pubkey = NodeID::from_slice(&hex::decode(&key).map_err(D::Error::custom)?) - .ok_or_else(|| D::Error::custom("Invalid bootstrap peer public key"))?; - let hosts = host[1..] - .to_socket_addrs() - .map_err(D::Error::custom)? - .collect::>(); - if hosts.is_empty() { - return Err(D::Error::custom(format!("Error resolving {}", &host[1..]))); - } - for host in hosts { - ret.push((pubkey.clone(), host)); + let (pubkey, addrs) = parse_and_resolve_peer_addr(peer) + .ok_or_else(|| D::Error::custom(format!("Unable to parse or resolve peer: {}", peer)))?; + for ip in addrs { + ret.push((pubkey.clone(), ip)); } } diff --git a/src/util/data.rs b/src/util/data.rs index 6df51cd0..d4fe0009 100644 --- a/src/util/data.rs +++ b/src/util/data.rs @@ -87,6 +87,18 @@ impl FixedBytes32 { } } +impl From for FixedBytes32 { + fn from(node_id: netapp::NodeID) -> FixedBytes32 { + FixedBytes32::try_from(node_id.as_ref()).unwrap() + } +} + +impl Into for FixedBytes32 { + fn into(self) -> netapp::NodeID { + netapp::NodeID::from_slice(self.as_slice()).unwrap() + } +} + /// A 32 bytes UUID pub type Uuid = FixedBytes32; /// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance diff --git a/src/util/error.rs b/src/util/error.rs index 804a0d4d..390327f1 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -1,35 +1,13 @@ //! Module containing error types used in Garage -use err_derive::Error; -use hyper::StatusCode; +use std::fmt; use std::io; +use err_derive::Error; + +use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; + use crate::data::*; -/// RPC related errors -#[derive(Debug, Error)] -pub enum RpcError { - #[error(display = "Node is down: {:?}.", _0)] - NodeDown(Uuid), - - #[error(display = "Timeout")] - Timeout, - - #[error(display = "HTTP error: {}", _0)] - Http(#[error(source)] http::Error), - - #[error(display = "Hyper error: {}", _0)] - Hyper(#[error(source)] hyper::Error), - - #[error(display = "Messagepack encode error: {}", _0)] - RmpEncode(#[error(source)] rmp_serde::encode::Error), - - #[error(display = "Messagepack decode error: {}", _0)] - RmpDecode(#[error(source)] rmp_serde::decode::Error), - - #[error(display = "Too many errors: {:?}", _0)] - TooManyErrors(Vec), -} - /// Regroup all Garage errors #[derive(Debug, Error)] pub enum Error { @@ -63,11 +41,14 @@ pub enum Error { #[error(display = "Tokio join error: {}", _0)] TokioJoin(#[error(source)] tokio::task::JoinError), - #[error(display = "RPC call error: {}", _0)] - Rpc(#[error(source)] RpcError), + #[error(display = "Remote error: {}", _0)] + RemoteError(String), - #[error(display = "Remote error: {} (status code {})", _0, _1)] - RemoteError(String, StatusCode), + #[error(display = "Timeout")] + Timeout, + + #[error(display = "Too many errors: {:?}", _0)] + TooManyErrors(Vec), #[error(display = "Bad RPC: {}", _0)] BadRpc(String), @@ -99,3 +80,44 @@ impl From> for Error { Error::Message("MPSC send error".to_string()) } } + +// Custom serialization for our error type, for use in RPC. +// Errors are serialized as a string of their Display representation. +// Upon deserialization, they all become a RemoteError with the +// given representation. + +impl Serialize for Error { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&format!("{}", self)) + } +} + +impl<'de> Deserialize<'de> for Error { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + deserializer.deserialize_string(ErrorVisitor) + } +} + +struct ErrorVisitor; + +impl<'de> Visitor<'de> for ErrorVisitor { + type Value = Error; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "a string that represents an error value") + } + + fn visit_str(self, error_msg: &str) -> Result { + Ok(Error::RemoteError(error_msg.to_string())) + } + + fn visit_string(self, error_msg: String) -> Result { + Ok(Error::RemoteError(error_msg)) + } +} diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index dc31c1b4..0b6620d2 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -26,4 +26,4 @@ percent-encoding = "2.1.0" futures = "0.3" http = "0.2" -hyper = "0.14" +hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] } diff --git a/src/web/error.rs b/src/web/error.rs index 08717ce1..5ac27914 100644 --- a/src/web/error.rs +++ b/src/web/error.rs @@ -38,7 +38,9 @@ impl Error { match self { Error::NotFound => StatusCode::NOT_FOUND, Error::ApiError(e) => e.http_status_code(), - Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE, + Error::InternalError( + GarageError::Timeout | GarageError::RemoteError(_) | GarageError::TooManyErrors(_), + ) => StatusCode::SERVICE_UNAVAILABLE, Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::BAD_REQUEST, }