From 3c36b449a3786bb62fa023bde37bac24635b5717 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 6 Apr 2020 21:02:15 +0200 Subject: [PATCH] Some work --- Cargo.lock | 76 ++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 2 +- src/membership.rs | 108 +++++++++++++++++++++++++++++++++++++--------- src/rpc.rs | 5 +-- 5 files changed, 167 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de2ef8fd..c524c6e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,30 @@ name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "block-buffer" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "block-padding" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "byte-tools" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "byteorder" version = "1.3.4" @@ -98,6 +122,14 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "digest" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "err-derive" version = "0.2.3" @@ -111,6 +143,11 @@ dependencies = [ "synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fake-simd" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "fnv" version = "1.0.6" @@ -247,11 +284,20 @@ dependencies = [ "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", + "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.16 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "generic-array" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "getrandom" version = "0.1.14" @@ -519,6 +565,11 @@ dependencies = [ "libc 0.2.68 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "opaque-debug" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "parking_lot" version = "0.10.0" @@ -718,6 +769,17 @@ dependencies = [ "syn 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sha2" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", + "digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "signal-hook-registry" version = "1.2.0" @@ -916,6 +978,11 @@ name = "try-lock" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "typenum" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "unicode-segmentation" version = "1.6.0" @@ -993,6 +1060,9 @@ dependencies = [ "checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" "checksum bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +"checksum block-buffer 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" +"checksum block-padding 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" +"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" "checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" "checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" @@ -1002,7 +1072,9 @@ dependencies = [ "checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1" "checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" "checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +"checksum digest 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" "checksum err-derive 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "82f46c91bbed409ee74495549acbfcc7fae856e712e1df15afe75d0775eedc6c" +"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" @@ -1017,6 +1089,7 @@ dependencies = [ "checksum futures-task 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" "checksum futures-util 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" "checksum h2 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "377038bf3c89d18d6ca1431e7a5027194fbd724ca10592b9487ede5e8e144f42" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" @@ -1046,6 +1119,7 @@ dependencies = [ "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum num-traits 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "c62be47e61d1842b9170f0fdeec8eba98e60e90e5446449a0545e5152acd7096" "checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" +"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" "checksum parking_lot 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "92e98c49ab0b7ce5b222f2cc9193fc4efe11c6d0bd4f648e374684a6857b1cfc" "checksum parking_lot_core 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7582838484df45743c8434fbff785e8edf260c28748353d44bc0da32e0ceabf1" "checksum pin-project 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7804a463a8d9572f13453c516a5faea534a2403d7ced2f0c7e100eeff072772c" @@ -1070,6 +1144,7 @@ dependencies = [ "checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" "checksum serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" "checksum serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" +"checksum sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "27044adfd2e1f077f649f59deb9490d3941d674002f7d062870a60ebe9bd47a0" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb6824dde66ad33bf20c6e8476f5b82b871bc8bc3c129a10ea2f7dae5060fa3" @@ -1090,6 +1165,7 @@ dependencies = [ "checksum toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" "checksum tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" "checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" +"checksum typenum 1.11.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6d2783fe2d6b8c1101136184eb41be8b1ad379e4657050b8aaff0c79ee7575f9" "checksum unicode-segmentation 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" "checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" diff --git a/Cargo.toml b/Cargo.toml index 8381af3a..73ad1d19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,4 @@ toml = "0.5" structopt = { version = "0.3", default-features = false } rand = "0.7" hex = "0.3" +sha2 = "0.8" diff --git a/src/main.rs b/src/main.rs index 4448d535..0fcda4e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -103,7 +103,7 @@ async fn main() { let api_server = api::run_api_server(sys.clone(), wait_from(rx2)); tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(membership::bootstrap(sys)); + tokio::spawn(sys.bootstrap()); let (e1, e2) = futures::join![rpc_server, api_server]; diff --git a/src/membership.rs b/src/membership.rs index c14d370b..2c9cd1ca 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,10 +1,13 @@ use std::sync::Arc; use std::collections::HashMap; use std::time::Duration; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; +use futures::future::join_all; +use futures::stream::StreamExt; use hyper::client::Client; use tokio::sync::RwLock; +use sha2::{Sha256, Digest}; use crate::Config; use crate::error::Error; @@ -28,14 +31,39 @@ pub struct System { pub struct Members { pub present: Vec, pub status: HashMap, + pub status_hash: Hash, pub config: HashMap, pub config_version: u64, } +impl Members { + fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) { + match self.present.binary_search(&info.id) { + Ok(pos) => {} + Err(pos) => self.present.insert(pos, info.id.clone()), + } + self.status.insert(info.id.clone(), + NodeStatus{ + addr: SocketAddr::new(ip, info.rpc_port), + remaining_ping_attempts: MAX_FAILED_PINGS, + }); + } + + fn recalculate_status_hash(&mut self) { + let mut hasher = Sha256::new(); + for node in self.present.iter() { + if let Some(status) = self.status.get(node) { + hasher.input(format!("{} {}\n", hex::encode(node), status.addr)); + } + } + self.status_hash.copy_from_slice(&hasher.result()[..]); + } +} + pub struct NodeStatus { pub addr: SocketAddr, - remaining_ping_attempts: usize, + pub remaining_ping_attempts: usize, } pub struct NodeConfig { @@ -51,36 +79,74 @@ impl System { members: RwLock::new(Members{ present: Vec::new(), status: HashMap::new(), + status_hash: [0u8; 32], config: HashMap::new(), config_version: 0, }), } } + pub async fn make_ping(&self) -> Message { + Message::Ping(PingMessage{ + id: self.id, + rpc_port: self.config.rpc_port, + present_hash: self.members.read().await.status_hash.clone(), + config_version: 0, + }) + } + pub async fn broadcast(&self) -> Vec { self.members.read().await.present.clone() } -} -pub async fn bootstrap(system: Arc) { - rpc_call_many_addr(system.clone(), - &system.config.bootstrap_peers, - &Message::Ping(PingMessage{ - id: system.id, - rpc_port: system.config.rpc_port, - present_hash: [0u8; 32], - config_version: 0, - }), - None, - PING_TIMEOUT).await; + pub async fn bootstrap(self: Arc) { + let ping_msg = self.make_ping().await; + let ping_resps = join_all( + self.config.bootstrap_peers.iter().cloned() + .map(|to| { + let sys = self.clone(); + let ping_msg_ref = &ping_msg; + async move { + (to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await) + } + })).await; + + let mut members = self.members.write().await; + for (addr, ping_resp) in ping_resps { + if let Ok(Message::Ping(info)) = ping_resp { + members.handle_ping(addr.ip(), &info); - unimplemented!() //TODO -} + } + } + members.recalculate_status_hash(); + drop(members); -pub async fn handle_ping(sys: Arc, from: &SocketAddr, ping: &PingMessage) -> Result { - unimplemented!() //TODO -} + let resps = rpc_call_many_addr(self.clone(), + &self.config.bootstrap_peers, + &ping_msg, + None, + PING_TIMEOUT).await; -pub async fn handle_advertise_node(sys: Arc, ping: &AdvertiseNodeMessage) -> Result { - unimplemented!() //TODO + unimplemented!() //TODO + } + + pub async fn handle_ping(self: Arc, + from: &SocketAddr, + ping: &PingMessage) + -> Result + { + let mut members = self.members.write().await; + members.handle_ping(from.ip(), ping); + members.recalculate_status_hash(); + drop(members); + + Ok(self.make_ping().await) + } + + pub async fn handle_advertise_node(self: Arc, + ping: &AdvertiseNodeMessage) + -> Result + { + unimplemented!() //TODO + } } diff --git a/src/rpc.rs b/src/rpc.rs index 4ba32c4c..5e72d0f0 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -14,7 +14,6 @@ use crate::data::*; use crate::error::Error; use crate::proto::Message; use crate::membership::System; -use crate::membership; // ---- CLIENT PART ---- @@ -133,8 +132,8 @@ async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Resu eprintln!("RPC from {}: {:?}", addr, msg); let resp = err_to_msg(match &msg { - Message::Ping(ping) => membership::handle_ping(sys, &addr, ping).await, - Message::AdvertiseNode(adv) => membership::handle_advertise_node(sys, adv).await, + Message::Ping(ping) => sys.handle_ping(&addr, ping).await, + Message::AdvertiseNode(adv) => sys.handle_advertise_node(adv).await, _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), });