fix hang on shutdown
This commit is contained in:
parent
aa79810596
commit
91e764a2bf
@ -9,10 +9,10 @@ use std::time::{Duration, Instant};
|
|||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::{join, select};
|
use futures::join;
|
||||||
use futures_util::future::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
@ -702,7 +702,7 @@ impl System {
|
|||||||
|
|
||||||
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
|
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
|
||||||
while !*stop_signal.borrow() {
|
while !*stop_signal.borrow() {
|
||||||
let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
|
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
|
||||||
|
|
||||||
self.update_local_status();
|
self.update_local_status();
|
||||||
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
|
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
|
||||||
@ -711,13 +711,14 @@ impl System {
|
|||||||
.broadcast(
|
.broadcast(
|
||||||
&self.system_endpoint,
|
&self.system_endpoint,
|
||||||
SystemRpc::AdvertiseStatus(local_status),
|
SystemRpc::AdvertiseStatus(local_status),
|
||||||
RequestStrategy::with_priority(PRIO_HIGH),
|
RequestStrategy::with_priority(PRIO_HIGH)
|
||||||
|
.with_custom_timeout(STATUS_EXCHANGE_INTERVAL),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = restart_at.fuse() => {},
|
_ = tokio::time::sleep_until(restart_at.into()) => {},
|
||||||
_ = stop_signal.changed().fuse() => {},
|
_ = stop_signal.changed() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -799,10 +800,9 @@ impl System {
|
|||||||
#[cfg(feature = "kubernetes-discovery")]
|
#[cfg(feature = "kubernetes-discovery")]
|
||||||
tokio::spawn(self.clone().advertise_to_kubernetes());
|
tokio::spawn(self.clone().advertise_to_kubernetes());
|
||||||
|
|
||||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
|
||||||
select! {
|
select! {
|
||||||
_ = restart_at.fuse() => {},
|
_ = tokio::time::sleep(DISCOVERY_INTERVAL) => {},
|
||||||
_ = stop_signal.changed().fuse() => {},
|
_ = stop_signal.changed() => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user