2020-04-06 17:55:39 +00:00
|
|
|
use std::sync::Arc;
|
2020-04-07 14:26:22 +00:00
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::io::{Read};
|
2020-04-06 17:55:39 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::time::Duration;
|
2020-04-06 19:02:15 +00:00
|
|
|
use std::net::{IpAddr, SocketAddr};
|
2020-04-06 17:55:39 +00:00
|
|
|
|
2020-04-07 14:26:22 +00:00
|
|
|
use tokio::prelude::*;
|
2020-04-06 19:02:15 +00:00
|
|
|
use futures::future::join_all;
|
2020-04-06 17:55:39 +00:00
|
|
|
use tokio::sync::RwLock;
|
2020-04-06 19:02:15 +00:00
|
|
|
use sha2::{Sha256, Digest};
|
2020-04-06 17:55:39 +00:00
|
|
|
|
2020-04-07 14:26:22 +00:00
|
|
|
use crate::server::Config;
|
2020-04-06 17:55:39 +00:00
|
|
|
use crate::error::Error;
|
|
|
|
use crate::data::*;
|
|
|
|
use crate::proto::*;
|
2020-04-07 14:26:22 +00:00
|
|
|
use crate::rpc_client::*;
|
2020-04-06 17:55:39 +00:00
|
|
|
|
|
|
|
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
|
|
|
const PING_TIMEOUT: Duration = Duration::from_secs(2);
|
|
|
|
const MAX_FAILED_PINGS: usize = 3;
|
|
|
|
|
|
|
|
pub struct System {
|
|
|
|
pub config: Config,
|
|
|
|
pub id: UUID,
|
|
|
|
|
2020-04-07 14:26:22 +00:00
|
|
|
pub rpc_client: RpcClient,
|
2020-04-06 17:55:39 +00:00
|
|
|
|
|
|
|
pub members: RwLock<Members>,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Members {
|
|
|
|
pub status: HashMap<UUID, NodeStatus>,
|
2020-04-06 19:02:15 +00:00
|
|
|
pub status_hash: Hash,
|
2020-04-06 17:55:39 +00:00
|
|
|
|
2020-04-06 20:27:51 +00:00
|
|
|
pub config: NetworkConfig,
|
2020-04-06 17:55:39 +00:00
|
|
|
}
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
impl Members {
|
2020-04-06 20:27:51 +00:00
|
|
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
2020-04-06 20:54:03 +00:00
|
|
|
let addr = SocketAddr::new(ip, info.rpc_port);
|
|
|
|
let old_status = self.status.insert(info.id.clone(),
|
2020-04-06 19:02:15 +00:00
|
|
|
NodeStatus{
|
2020-04-06 20:54:03 +00:00
|
|
|
addr: addr.clone(),
|
2020-04-06 19:02:15 +00:00
|
|
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
2020-04-06 22:00:43 +00:00
|
|
|
datacenter: info.datacenter.clone(),
|
2020-04-06 20:54:03 +00:00
|
|
|
});
|
|
|
|
match old_status {
|
|
|
|
None => {
|
2020-04-06 22:00:43 +00:00
|
|
|
eprintln!("Newly pingable node: {}", hex::encode(info.id));
|
2020-04-06 20:54:03 +00:00
|
|
|
true
|
|
|
|
}
|
|
|
|
Some(x) => x.addr != addr,
|
|
|
|
}
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
fn recalculate_status_hash(&mut self) {
|
2020-04-06 20:27:51 +00:00
|
|
|
let mut nodes = self.status.iter().collect::<Vec<_>>();
|
|
|
|
nodes.sort_by_key(|(id, _status)| *id);
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
let mut hasher = Sha256::new();
|
2020-04-06 21:10:28 +00:00
|
|
|
eprintln!("Current set of pingable nodes: --");
|
2020-04-06 20:27:51 +00:00
|
|
|
for (id, status) in nodes {
|
2020-04-06 22:00:43 +00:00
|
|
|
eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter);
|
2020-04-06 20:27:51 +00:00
|
|
|
hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
|
2020-04-06 19:02:15 +00:00
|
|
|
}
|
2020-04-06 21:10:28 +00:00
|
|
|
eprintln!("END --");
|
2020-04-06 19:02:15 +00:00
|
|
|
self.status_hash.copy_from_slice(&hasher.result()[..]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-06 17:55:39 +00:00
|
|
|
pub struct NodeStatus {
|
|
|
|
pub addr: SocketAddr,
|
2020-04-06 22:00:43 +00:00
|
|
|
pub datacenter: String,
|
2020-04-06 19:02:15 +00:00
|
|
|
pub remaining_ping_attempts: usize,
|
2020-04-06 17:55:39 +00:00
|
|
|
}
|
|
|
|
|
2020-04-07 14:26:22 +00:00
|
|
|
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
|
|
|
|
let mut path = metadata_dir.clone();
|
|
|
|
path.push("network_config");
|
|
|
|
|
|
|
|
let mut file = std::fs::OpenOptions::new()
|
|
|
|
.read(true)
|
|
|
|
.open(path.as_path())?;
|
|
|
|
|
|
|
|
let mut net_config_bytes = vec![];
|
|
|
|
file.read_to_end(&mut net_config_bytes)
|
|
|
|
.expect("Failure when reading network_config");
|
|
|
|
|
|
|
|
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
|
|
|
|
.expect("Invalid or corrupt network_config file");
|
|
|
|
|
|
|
|
Ok(net_config)
|
|
|
|
}
|
2020-04-06 17:55:39 +00:00
|
|
|
|
|
|
|
impl System {
|
|
|
|
pub fn new(config: Config, id: UUID) -> Self {
|
2020-04-07 14:26:22 +00:00
|
|
|
let net_config = match read_network_config(&config.metadata_dir) {
|
|
|
|
Ok(x) => x,
|
|
|
|
Err(_) => NetworkConfig{
|
|
|
|
members: HashMap::new(),
|
|
|
|
version: 0,
|
|
|
|
},
|
|
|
|
};
|
2020-04-06 21:10:28 +00:00
|
|
|
let mut members = Members{
|
2020-04-06 17:55:39 +00:00
|
|
|
status: HashMap::new(),
|
2020-04-06 19:02:15 +00:00
|
|
|
status_hash: [0u8; 32],
|
2020-04-07 14:26:22 +00:00
|
|
|
config: net_config,
|
|
|
|
};
|
2020-04-06 21:10:28 +00:00
|
|
|
members.recalculate_status_hash();
|
|
|
|
System{
|
|
|
|
config,
|
|
|
|
id,
|
2020-04-07 14:26:22 +00:00
|
|
|
rpc_client: RpcClient::new(),
|
2020-04-06 21:10:28 +00:00
|
|
|
members: RwLock::new(members),
|
2020-04-06 17:55:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-07 14:26:22 +00:00
|
|
|
pub async fn save_network_config(&self) {
|
|
|
|
let mut path = self.config.metadata_dir.clone();
|
|
|
|
path.push("network_config");
|
|
|
|
|
|
|
|
let members = self.members.read().await;
|
|
|
|
let data = rmp_serde::encode::to_vec_named(&members.config)
|
|
|
|
.expect("Error while encoding network config");
|
|
|
|
drop(members);
|
|
|
|
|
|
|
|
let mut f = tokio::fs::File::create(path.as_path()).await
|
|
|
|
.expect("Could not create network_config");
|
|
|
|
f.write_all(&data[..]).await
|
|
|
|
.expect("Could not write network_config");
|
|
|
|
}
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
pub async fn make_ping(&self) -> Message {
|
2020-04-06 20:27:51 +00:00
|
|
|
let members = self.members.read().await;
|
2020-04-06 19:02:15 +00:00
|
|
|
Message::Ping(PingMessage{
|
|
|
|
id: self.id,
|
2020-04-06 22:00:43 +00:00
|
|
|
datacenter: self.config.datacenter.clone(),
|
2020-04-06 19:02:15 +00:00
|
|
|
rpc_port: self.config.rpc_port,
|
2020-04-06 20:27:51 +00:00
|
|
|
status_hash: members.status_hash.clone(),
|
|
|
|
config_version: members.config.version,
|
2020-04-06 19:02:15 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-04-06 20:27:51 +00:00
|
|
|
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
|
|
|
let members = self.members.read().await;
|
2020-04-06 20:54:03 +00:00
|
|
|
let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>();
|
2020-04-06 20:27:51 +00:00
|
|
|
drop(members);
|
|
|
|
rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
|
2020-04-06 17:55:39 +00:00
|
|
|
}
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
pub async fn bootstrap(self: Arc<Self>) {
|
2020-04-06 22:00:43 +00:00
|
|
|
let bootstrap_peers = self.config.bootstrap_peers
|
|
|
|
.iter()
|
|
|
|
.map(|ip| (ip.clone(), None))
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
self.clone().ping_nodes(bootstrap_peers).await;
|
|
|
|
|
|
|
|
tokio::spawn(self.ping_loop());
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
|
2020-04-06 19:02:15 +00:00
|
|
|
let ping_msg = self.make_ping().await;
|
|
|
|
let ping_resps = join_all(
|
2020-04-06 22:00:43 +00:00
|
|
|
peers.iter()
|
|
|
|
.map(|(addr, id_option)| {
|
2020-04-06 19:02:15 +00:00
|
|
|
let sys = self.clone();
|
|
|
|
let ping_msg_ref = &ping_msg;
|
|
|
|
async move {
|
2020-04-07 14:26:22 +00:00
|
|
|
(id_option, addr.clone(), sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await)
|
2020-04-06 19:02:15 +00:00
|
|
|
}
|
|
|
|
})).await;
|
|
|
|
|
|
|
|
let mut members = self.members.write().await;
|
2020-04-06 22:00:43 +00:00
|
|
|
|
|
|
|
let mut has_changes = false;
|
|
|
|
let mut to_advertise = vec![];
|
|
|
|
|
|
|
|
for (id_option, addr, ping_resp) in ping_resps {
|
2020-04-06 19:02:15 +00:00
|
|
|
if let Ok(Message::Ping(info)) = ping_resp {
|
2020-04-06 22:00:43 +00:00
|
|
|
let is_new = members.handle_ping(addr.ip(), &info);
|
|
|
|
if is_new {
|
|
|
|
has_changes = true;
|
|
|
|
to_advertise.push(AdvertisedNode{
|
|
|
|
id: info.id.clone(),
|
|
|
|
addr: addr.clone(),
|
2020-04-07 14:26:22 +00:00
|
|
|
datacenter: info.datacenter.clone(),
|
2020-04-06 22:00:43 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
if is_new || members.status_hash != info.status_hash {
|
|
|
|
tokio::spawn(self.clone().pull_status(info.id.clone()));
|
|
|
|
}
|
|
|
|
if is_new || members.config.version < info.config_version {
|
|
|
|
tokio::spawn(self.clone().pull_config(info.id.clone()));
|
|
|
|
}
|
|
|
|
} else if let Some(id) = id_option {
|
|
|
|
let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
|
|
|
|
if remaining_attempts == 0 {
|
|
|
|
eprintln!("Removing node {} after too many failed pings", hex::encode(id));
|
|
|
|
members.status.remove(id);
|
|
|
|
has_changes = true;
|
|
|
|
} else {
|
|
|
|
if let Some(st) = members.status.get_mut(id) {
|
|
|
|
st.remaining_ping_attempts = remaining_attempts - 1;
|
|
|
|
}
|
|
|
|
}
|
2020-04-06 19:02:15 +00:00
|
|
|
}
|
|
|
|
}
|
2020-04-06 22:00:43 +00:00
|
|
|
if has_changes {
|
|
|
|
members.recalculate_status_hash();
|
|
|
|
}
|
2020-04-06 19:02:15 +00:00
|
|
|
drop(members);
|
2020-04-06 17:55:39 +00:00
|
|
|
|
2020-04-06 22:00:43 +00:00
|
|
|
if to_advertise.len() > 0 {
|
|
|
|
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await;
|
|
|
|
}
|
2020-04-06 19:02:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn handle_ping(self: Arc<Self>,
|
|
|
|
from: &SocketAddr,
|
|
|
|
ping: &PingMessage)
|
|
|
|
-> Result<Message, Error>
|
|
|
|
{
|
|
|
|
let mut members = self.members.write().await;
|
2020-04-06 20:27:51 +00:00
|
|
|
let is_new = members.handle_ping(from.ip(), ping);
|
2020-04-06 20:54:03 +00:00
|
|
|
if is_new {
|
|
|
|
members.recalculate_status_hash();
|
|
|
|
}
|
2020-04-06 20:27:51 +00:00
|
|
|
let status_hash = members.status_hash.clone();
|
|
|
|
let config_version = members.config.version;
|
2020-04-06 19:02:15 +00:00
|
|
|
drop(members);
|
|
|
|
|
2020-04-06 20:27:51 +00:00
|
|
|
if is_new || status_hash != ping.status_hash {
|
|
|
|
tokio::spawn(self.clone().pull_status(ping.id.clone()));
|
|
|
|
}
|
|
|
|
if is_new || config_version < ping.config_version {
|
|
|
|
tokio::spawn(self.clone().pull_config(ping.id.clone()));
|
|
|
|
}
|
|
|
|
|
2020-04-06 19:02:15 +00:00
|
|
|
Ok(self.make_ping().await)
|
|
|
|
}
|
|
|
|
|
2020-04-06 20:27:51 +00:00
|
|
|
pub async fn handle_pull_status(&self) -> Result<Message, Error> {
|
|
|
|
let members = self.members.read().await;
|
|
|
|
let mut mem = vec![];
|
|
|
|
for (node, status) in members.status.iter() {
|
|
|
|
mem.push(AdvertisedNode{
|
|
|
|
id: node.clone(),
|
|
|
|
addr: status.addr.clone(),
|
2020-04-07 14:26:22 +00:00
|
|
|
datacenter: status.datacenter.clone(),
|
2020-04-06 20:27:51 +00:00
|
|
|
});
|
|
|
|
}
|
|
|
|
Ok(Message::AdvertiseNodesUp(mem))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn handle_pull_config(&self) -> Result<Message, Error> {
|
|
|
|
let members = self.members.read().await;
|
|
|
|
Ok(Message::AdvertiseConfig(members.config.clone()))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn handle_advertise_nodes_up(self: Arc<Self>,
|
|
|
|
adv: &[AdvertisedNode])
|
2020-04-06 19:02:15 +00:00
|
|
|
-> Result<Message, Error>
|
|
|
|
{
|
2020-04-06 22:00:43 +00:00
|
|
|
let mut to_ping = vec![];
|
2020-04-06 20:27:51 +00:00
|
|
|
|
|
|
|
let mut members = self.members.write().await;
|
2020-04-06 22:00:43 +00:00
|
|
|
let mut has_changed = false;
|
|
|
|
|
2020-04-06 20:27:51 +00:00
|
|
|
for node in adv.iter() {
|
2020-04-06 22:00:43 +00:00
|
|
|
if node.id == self.id {
|
|
|
|
// learn our own ip address
|
|
|
|
let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
|
|
|
|
let old_self = members.status.insert(node.id.clone(),
|
|
|
|
NodeStatus{
|
|
|
|
addr: self_addr,
|
|
|
|
datacenter: self.config.datacenter.clone(),
|
|
|
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
|
|
|
});
|
|
|
|
has_changed = match old_self {
|
|
|
|
None => true,
|
|
|
|
Some(x) => x.addr != self_addr,
|
|
|
|
};
|
|
|
|
} else if !members.status.contains_key(&node.id) {
|
|
|
|
to_ping.push((node.addr.clone(), Some(node.id.clone())));
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
|
|
|
}
|
2020-04-06 22:00:43 +00:00
|
|
|
if has_changed {
|
2020-04-06 21:10:28 +00:00
|
|
|
members.recalculate_status_hash();
|
2020-04-06 22:00:43 +00:00
|
|
|
}
|
|
|
|
drop(members);
|
|
|
|
|
|
|
|
if to_ping.len() > 0 {
|
|
|
|
tokio::spawn(self.clone().ping_nodes(to_ping));
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Message::Ok)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn handle_advertise_config(self: Arc<Self>,
|
|
|
|
adv: &NetworkConfig)
|
|
|
|
-> Result<Message, Error>
|
|
|
|
{
|
|
|
|
let mut members = self.members.write().await;
|
|
|
|
if adv.version > members.config.version {
|
|
|
|
members.config = adv.clone();
|
|
|
|
tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT));
|
2020-04-07 14:26:22 +00:00
|
|
|
self.save_network_config().await;
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Message::Ok)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn ping_loop(self: Arc<Self>) {
|
|
|
|
loop {
|
|
|
|
let restart_at = tokio::time::delay_for(PING_INTERVAL);
|
|
|
|
|
|
|
|
let members = self.members.read().await;
|
|
|
|
let ping_addrs = members.status.iter()
|
2020-04-06 20:54:03 +00:00
|
|
|
.filter(|(id, _)| **id != self.id)
|
2020-04-06 22:00:43 +00:00
|
|
|
.map(|(id, status)| (status.addr.clone(), Some(id.clone())))
|
2020-04-06 20:27:51 +00:00
|
|
|
.collect::<Vec<_>>();
|
|
|
|
drop(members);
|
|
|
|
|
2020-04-06 22:00:43 +00:00
|
|
|
self.clone().ping_nodes(ping_addrs).await;
|
2020-04-06 20:27:51 +00:00
|
|
|
|
|
|
|
restart_at.await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-04-06 20:54:03 +00:00
|
|
|
pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static {
|
|
|
|
async move {
|
|
|
|
let resp = rpc_call(self.clone(),
|
|
|
|
&peer,
|
|
|
|
&Message::PullStatus,
|
|
|
|
PING_TIMEOUT).await;
|
|
|
|
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
|
|
|
|
let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
|
|
|
|
}
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn pull_config(self: Arc<Self>, peer: UUID) {
|
|
|
|
let resp = rpc_call(self.clone(),
|
|
|
|
&peer,
|
|
|
|
&Message::PullConfig,
|
|
|
|
PING_TIMEOUT).await;
|
|
|
|
if let Ok(Message::AdvertiseConfig(config)) = resp {
|
2020-04-06 20:54:03 +00:00
|
|
|
let _: Result<_, _> = self.handle_advertise_config(&config).await;
|
2020-04-06 20:27:51 +00:00
|
|
|
}
|
2020-04-06 19:02:15 +00:00
|
|
|
}
|
2020-04-06 17:55:39 +00:00
|
|
|
}
|