From be0a2bae8112de1d4674767c5c60e296c28e8532 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 21 Apr 2020 14:07:15 +0000 Subject: [PATCH] Add node tags in configuration --- src/main.rs | 14 +++++++++++--- src/membership.rs | 4 +++- src/rpc_server.rs | 16 +++++++++++++--- src/table_sync.rs | 25 +++++++++++++++++-------- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9b75740d..01972928 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,6 +111,10 @@ pub struct ConfigureNodeOpt { /// Number of tokens n_tokens: u32, + + /// Optionnal node tag + #[structopt(long = "tag", default_value = "")] + tag: String, } #[derive(StructOpt, Debug)] @@ -266,8 +270,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Re for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { println!( - "{:?}\t{}\t{}\t{}\t{}", - adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens + "{:?}\t{}\t{}\t{}\t{}\t{}", + adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens ); } } @@ -281,7 +285,10 @@ async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Re println!("\nFailed nodes:"); for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { - println!("{:?}\t{}\t{}", id, cfg.datacenter, cfg.n_tokens); + println!( + "{:?}\t{}\t{}\t{}", + id, cfg.tag, cfg.datacenter, cfg.n_tokens + ); } } } @@ -340,6 +347,7 @@ async fn cmd_configure( NetworkConfigEntry { datacenter: args.datacenter, n_tokens: args.n_tokens, + tag: args.tag, }, ); config.version += 1; diff --git a/src/membership.rs b/src/membership.rs index 6f6dd47d..89b0fd67 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -69,6 +69,7 @@ pub struct NetworkConfig { pub struct NetworkConfigEntry { pub datacenter: String, pub n_tokens: u32, + pub tag: String, } pub struct System { @@ -248,7 +249,8 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result { let mut net_config_bytes = vec![]; file.read_to_end(&mut net_config_bytes)?; - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?; + let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) + .expect("Unable to parse network configuration file (has version format changed?)."); Ok(net_config) } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 4541e4da..938eb512 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; -use std::time::Instant; use std::sync::Arc; +use std::time::Instant; use bytes::IntoBuf; use futures::future::Future; @@ -51,7 +51,11 @@ where match handler(msg, sockaddr).await { Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::>(&Ok(resp))?; - trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis()); + trace!( + "]RPC:{},ok ({} ms)", + name, + (Instant::now() - begin_time).as_millis() + ); Ok(Response::new(Body::from(resp_bytes))) } Err(e) => { @@ -59,7 +63,13 @@ where let rep_bytes = rmp_to_vec_all_named::>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = e.http_status_code(); - warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str); + warn!( + "RPC error ({}): {} ({} ms), request: {}", + name, + e, + (Instant::now() - begin_time).as_millis(), + req_str + ); Ok(err_response) } } diff --git a/src/table_sync.rs b/src/table_sync.rs index cb6d87aa..26c5bed8 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -198,19 +198,25 @@ where partition: &TodoPartition, must_exit: &mut watch::Receiver, ) -> Result<(), Error> { - debug!("({}) Preparing to sync {:?}...", self.table.name, partition); - let root_cks = self - .root_checksum(&partition.begin, &partition.end, must_exit) - .await?; - let my_id = self.table.system.id.clone(); let nodes = self .table .replication - .write_nodes(&partition.begin, &self.table.system); + .write_nodes(&partition.begin, &self.table.system) + .into_iter() + .filter(|node| *node != my_id) + .collect::>(); + + debug!( + "({}) Preparing to sync {:?} with {:?}...", + self.table.name, partition, nodes + ); + let root_cks = self + .root_checksum(&partition.begin, &partition.end, must_exit) + .await?; + let mut sync_futures = nodes .iter() - .filter(|node| **node != my_id) .map(|node| { self.clone().do_sync_with( partition.clone(), @@ -230,7 +236,10 @@ where } } if n_errors > self.table.replication.max_write_errors() { - return Err(Error::Message(format!("Sync failed with too many nodes."))); + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); } if !partition.retain {