Some improvements in background worker but we terminate late
This commit is contained in:
parent
0cd5b2ae19
commit
6a8439fd13
@ -193,7 +193,12 @@ impl AdminRpcHandler {
|
|||||||
let key_ids = self
|
let key_ids = self
|
||||||
.garage
|
.garage
|
||||||
.key_table
|
.key_table
|
||||||
.get_range(&EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000)
|
.get_range(
|
||||||
|
&EmptyKey,
|
||||||
|
None,
|
||||||
|
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
|
||||||
|
10000,
|
||||||
|
)
|
||||||
.await?
|
.await?
|
||||||
.iter()
|
.iter()
|
||||||
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
|
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
|
||||||
@ -257,15 +262,24 @@ impl AdminRpcHandler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
|
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
|
||||||
let candidates = self.garage
|
let candidates = self
|
||||||
|
.garage
|
||||||
.key_table
|
.key_table
|
||||||
.get_range(&EmptyKey, None, Some(KeyFilter::Matches(pattern.to_string())), 10)
|
.get_range(
|
||||||
|
&EmptyKey,
|
||||||
|
None,
|
||||||
|
Some(KeyFilter::Matches(pattern.to_string())),
|
||||||
|
10,
|
||||||
|
)
|
||||||
.await?
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|k| !k.deleted.get())
|
.filter(|k| !k.deleted.get())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
if candidates.len() != 1 {
|
if candidates.len() != 1 {
|
||||||
Err(Error::Message(format!("{} matching keys", candidates.len())))
|
Err(Error::Message(format!(
|
||||||
|
"{} matching keys",
|
||||||
|
candidates.len()
|
||||||
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(candidates.into_iter().next().unwrap())
|
Ok(candidates.into_iter().next().unwrap())
|
||||||
}
|
}
|
||||||
@ -469,12 +483,7 @@ impl AdminRpcHandler {
|
|||||||
t.data.merkle_updater.merkle_tree_len()
|
t.data.merkle_updater.merkle_tree_len()
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
writeln!(
|
writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap();
|
||||||
to,
|
|
||||||
" GC todo queue length: {}",
|
|
||||||
t.data.gc_todo_len()
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,11 +16,7 @@ pub struct Repair {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Repair {
|
impl Repair {
|
||||||
pub async fn repair_worker(
|
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
|
||||||
&self,
|
|
||||||
opt: RepairOpt,
|
|
||||||
must_exit: watch::Receiver<bool>,
|
|
||||||
) {
|
|
||||||
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
|
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
|
||||||
warn!("Repair worker failed with error: {}", e);
|
warn!("Repair worker failed with error: {}", e);
|
||||||
}
|
}
|
||||||
|
@ -47,10 +47,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||||||
|
|
||||||
info!("Initializing background runner...");
|
info!("Initializing background runner...");
|
||||||
let (send_cancel, watch_cancel) = watch::channel(false);
|
let (send_cancel, watch_cancel) = watch::channel(false);
|
||||||
let background = BackgroundRunner::new(16, watch_cancel.clone());
|
let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
|
||||||
|
|
||||||
info!("Initializing Garage main data store...");
|
info!("Initializing Garage main data store...");
|
||||||
let garage = Garage::new(config, db, background.clone(), &mut rpc_server);
|
let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
|
||||||
|
let bootstrap = garage.system.clone().bootstrap(
|
||||||
|
&config.bootstrap_peers[..],
|
||||||
|
config.consul_host,
|
||||||
|
config.consul_service_name,
|
||||||
|
);
|
||||||
|
|
||||||
info!("Crate admin RPC handler...");
|
info!("Crate admin RPC handler...");
|
||||||
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
|
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
|
||||||
@ -58,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||||||
info!("Initializing RPC and API servers...");
|
info!("Initializing RPC and API servers...");
|
||||||
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
|
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
|
||||||
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
|
||||||
let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone()));
|
let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone()));
|
||||||
|
|
||||||
futures::try_join!(
|
futures::try_join!(
|
||||||
garage
|
bootstrap.map(|rv| {
|
||||||
.system
|
info!("Bootstrap done");
|
||||||
.clone()
|
Ok(rv)
|
||||||
.bootstrap(
|
}),
|
||||||
&garage.config.bootstrap_peers[..],
|
|
||||||
garage.config.consul_host.clone(),
|
|
||||||
garage.config.consul_service_name.clone()
|
|
||||||
)
|
|
||||||
.map(|rv| {
|
|
||||||
info!("Bootstrap done");
|
|
||||||
Ok(rv)
|
|
||||||
}),
|
|
||||||
run_rpc_server.map(|rv| {
|
run_rpc_server.map(|rv| {
|
||||||
info!("RPC server exited");
|
info!("RPC server exited");
|
||||||
rv
|
rv
|
||||||
@ -85,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
|
|||||||
info!("Web server exited");
|
info!("Web server exited");
|
||||||
rv
|
rv
|
||||||
}),
|
}),
|
||||||
background.run().map(|rv| {
|
await_background_done.map(|rv| {
|
||||||
info!("Background runner exited");
|
info!("Background runner exited: {:?}", rv);
|
||||||
Ok(rv)
|
Ok(())
|
||||||
}),
|
}),
|
||||||
shutdown_signal(send_cancel),
|
shutdown_signal(send_cancel),
|
||||||
)?;
|
)?;
|
||||||
|
@ -254,19 +254,18 @@ impl BlockManager {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resync_loop(
|
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
|
||||||
self: Arc<Self>,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
) {
|
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
if let Err(e) = self.resync_iter(&mut must_exit).await {
|
if let Err(e) = self.resync_iter(&mut must_exit).await {
|
||||||
warn!("Error in block resync loop: {}", e);
|
warn!("Error in block resync loop: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
select! {
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
|
||||||
|
_ = must_exit.changed().fuse() => (),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
|
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
|
||||||
if let Some(first_item) = self.resync_queue.iter().next() {
|
if let Some(first_item) = self.resync_queue.iter().next() {
|
||||||
let (time_bytes, hash_bytes) = first_item?;
|
let (time_bytes, hash_bytes) = first_item?;
|
||||||
@ -280,7 +279,7 @@ impl BlockManager {
|
|||||||
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
|
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
|
||||||
}
|
}
|
||||||
self.resync_queue.remove(&time_bytes)?;
|
self.resync_queue.remove(&time_bytes)?;
|
||||||
res?; // propagate error to delay main loop
|
res?; // propagate error to delay main loop
|
||||||
} else {
|
} else {
|
||||||
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
|
||||||
select! {
|
select! {
|
||||||
|
@ -109,7 +109,8 @@ impl TableSchema for KeyTable {
|
|||||||
KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),
|
KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),
|
||||||
KeyFilter::Matches(pat) => {
|
KeyFilter::Matches(pat) => {
|
||||||
let pat = pat.to_lowercase();
|
let pat = pat.to_lowercase();
|
||||||
entry.key_id.to_lowercase().starts_with(&pat) || entry.name.get().to_lowercase() == pat
|
entry.key_id.to_lowercase().starts_with(&pat)
|
||||||
|
|| entry.name.get().to_lowercase() == pat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -195,8 +195,7 @@ impl TableSchema for ObjectTable {
|
|||||||
|
|
||||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let version_table = self.version_table.clone();
|
let version_table = self.version_table.clone();
|
||||||
// TODO not cancellable
|
self.background.spawn(async move {
|
||||||
self.background.spawn_cancellable(async move {
|
|
||||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
// Propagate deletion of old versions
|
// Propagate deletion of old versions
|
||||||
for v in old_v.versions.iter() {
|
for v in old_v.versions.iter() {
|
||||||
|
@ -110,8 +110,7 @@ impl TableSchema for VersionTable {
|
|||||||
|
|
||||||
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
|
||||||
let block_ref_table = self.block_ref_table.clone();
|
let block_ref_table = self.block_ref_table.clone();
|
||||||
// TODO not cancellable
|
self.background.spawn(async move {
|
||||||
self.background.spawn_cancellable(async move {
|
|
||||||
if let (Some(old_v), Some(new_v)) = (old, new) {
|
if let (Some(old_v), Some(new_v)) = (old, new) {
|
||||||
// Propagate deletion of version blocks
|
// Propagate deletion of version blocks
|
||||||
if new_v.deleted.get() && !old_v.deleted.get() {
|
if new_v.deleted.get() && !old_v.deleted.get() {
|
||||||
|
@ -11,9 +11,9 @@ use futures::future::join_all;
|
|||||||
use futures::select;
|
use futures::select;
|
||||||
use futures_util::future::*;
|
use futures_util::future::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
|
|
||||||
use garage_util::background::BackgroundRunner;
|
use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
@ -316,17 +316,16 @@ impl System {
|
|||||||
self.clone().ping_nodes(bootstrap_peers).await;
|
self.clone().ping_nodes(bootstrap_peers).await;
|
||||||
|
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
self.clone()
|
self.background
|
||||||
.background
|
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||||
.spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal));
|
self2.ping_loop(stop_signal)
|
||||||
|
});
|
||||||
|
|
||||||
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
self.clone()
|
self.background
|
||||||
.background
|
|
||||||
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
.spawn_worker(format!("Consul loop"), |stop_signal| {
|
||||||
self2
|
self2.consul_loop(stop_signal, consul_host, consul_service_name)
|
||||||
.consul_loop(stop_signal, consul_host, consul_service_name)
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -531,7 +530,7 @@ impl System {
|
|||||||
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
|
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
|
||||||
.map(Ok),
|
.map(Ok),
|
||||||
);
|
);
|
||||||
self.background.spawn(self.clone().save_network_config()).await;
|
self.background.spawn(self.clone().save_network_config());
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Message::Ok)
|
Ok(Message::Ok)
|
||||||
@ -568,7 +567,7 @@ impl System {
|
|||||||
consul_host: String,
|
consul_host: String,
|
||||||
consul_service_name: String,
|
consul_service_name: String,
|
||||||
) {
|
) {
|
||||||
loop {
|
while !*stop_signal.borrow() {
|
||||||
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
|
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
|
||||||
|
|
||||||
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
match get_consul_nodes(&consul_host, &consul_service_name).await {
|
||||||
@ -583,11 +582,7 @@ impl System {
|
|||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = restart_at.fuse() => (),
|
_ = restart_at.fuse() => (),
|
||||||
_ = stop_signal.changed().fuse() => {
|
_ = stop_signal.changed().fuse() => (),
|
||||||
if *stop_signal.borrow() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -161,11 +161,11 @@ impl Ring {
|
|||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
eprintln!("RING: --");
|
// eprintln!("RING: --");
|
||||||
for e in ring.iter() {
|
// for e in ring.iter() {
|
||||||
eprintln!("{:?}", e);
|
// eprintln!("{:?}", e);
|
||||||
}
|
// }
|
||||||
eprintln!("END --");
|
// eprintln!("END --");
|
||||||
|
|
||||||
Self { config, ring }
|
Self { config, ring }
|
||||||
}
|
}
|
||||||
|
@ -198,7 +198,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
|
|||||||
let wait_finished_fut = tokio::spawn(async move {
|
let wait_finished_fut = tokio::spawn(async move {
|
||||||
resp_stream.collect::<Vec<_>>().await;
|
resp_stream.collect::<Vec<_>>().await;
|
||||||
});
|
});
|
||||||
self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await;
|
self.background.spawn(wait_finished_fut.map(|_| Ok(())));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(results)
|
Ok(results)
|
||||||
|
@ -13,9 +13,9 @@ use hyper::service::{make_service_fn, service_fn};
|
|||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio_stream::wrappers::TcpListenerStream;
|
|
||||||
use tokio_rustls::server::TlsStream;
|
use tokio_rustls::server::TlsStream;
|
||||||
use tokio_rustls::TlsAcceptor;
|
use tokio_rustls::TlsAcceptor;
|
||||||
|
use tokio_stream::wrappers::TcpListenerStream;
|
||||||
|
|
||||||
use garage_util::config::TlsConfig;
|
use garage_util::config::TlsConfig;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
@ -52,7 +52,11 @@ where
|
|||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Request message: {}",
|
"Request message: {}",
|
||||||
serde_json::to_string(&msg).unwrap_or("<json error>".into()).chars().take(100).collect::<String>()
|
serde_json::to_string(&msg)
|
||||||
|
.unwrap_or("<json error>".into())
|
||||||
|
.chars()
|
||||||
|
.take(100)
|
||||||
|
.collect::<String>()
|
||||||
);
|
);
|
||||||
|
|
||||||
match handler(msg, sockaddr).await {
|
match handler(msg, sockaddr).await {
|
||||||
|
@ -101,10 +101,7 @@ impl MerkleUpdater {
|
|||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn updater_loop(
|
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
|
||||||
self: Arc<Self>,
|
|
||||||
mut must_exit: watch::Receiver<bool>,
|
|
||||||
) {
|
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
if let Some(x) = self.todo.iter().next() {
|
if let Some(x) = self.todo.iter().next() {
|
||||||
match x {
|
match x {
|
||||||
|
@ -3,7 +3,7 @@ use std::convert::TryInto;
|
|||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{select};
|
use futures::select;
|
||||||
use futures_util::future::*;
|
use futures_util::future::*;
|
||||||
use futures_util::stream::*;
|
use futures_util::stream::*;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{mpsc, watch};
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use futures::future::*;
|
||||||
|
use futures::select;
|
||||||
|
use tokio::sync::{mpsc, watch, Mutex};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
|
||||||
@ -14,99 +15,115 @@ type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
|
|||||||
pub struct BackgroundRunner {
|
pub struct BackgroundRunner {
|
||||||
pub stop_signal: watch::Receiver<bool>,
|
pub stop_signal: watch::Receiver<bool>,
|
||||||
|
|
||||||
queue_in: ArcSwapOption<mpsc::UnboundedSender<(Job, bool)>>,
|
queue_in: mpsc::UnboundedSender<(Job, bool)>,
|
||||||
|
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
|
||||||
workers: Mutex<Vec<tokio::task::JoinHandle<()>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BackgroundRunner {
|
impl BackgroundRunner {
|
||||||
pub fn new(n_runners: usize, stop_signal: watch::Receiver<bool>) -> Arc<Self> {
|
pub fn new(
|
||||||
let (queue_in, queue_out) = mpsc::unbounded_channel();
|
n_runners: usize,
|
||||||
|
stop_signal: watch::Receiver<bool>,
|
||||||
|
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
|
||||||
|
let (worker_in, mut worker_out) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let mut workers = vec![];
|
let stop_signal_2 = stop_signal.clone();
|
||||||
let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out));
|
let await_all_done = tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let wkr = {
|
||||||
|
select! {
|
||||||
|
item = worker_out.recv().fuse() => {
|
||||||
|
match item {
|
||||||
|
Some(x) => x,
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
|
||||||
|
if *stop_signal_2.borrow() {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = wkr.await {
|
||||||
|
error!("Error while awaiting for worker: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let (queue_in, queue_out) = mpsc::unbounded_channel();
|
||||||
|
let queue_out = Arc::new(Mutex::new(queue_out));
|
||||||
|
|
||||||
for i in 0..n_runners {
|
for i in 0..n_runners {
|
||||||
let queue_out = queue_out.clone();
|
let queue_out = queue_out.clone();
|
||||||
let stop_signal = stop_signal.clone();
|
let stop_signal = stop_signal.clone();
|
||||||
|
|
||||||
workers.push(tokio::spawn(async move {
|
worker_in
|
||||||
while let Some((job, cancellable)) = queue_out.lock().await.recv().await {
|
.send(tokio::spawn(async move {
|
||||||
if cancellable && *stop_signal.borrow() {
|
loop {
|
||||||
continue;
|
let (job, cancellable) = {
|
||||||
|
select! {
|
||||||
|
item = wait_job(&queue_out).fuse() => match item {
|
||||||
|
// We received a task, process it
|
||||||
|
Some(x) => x,
|
||||||
|
// We received a signal that no more tasks will ever be sent
|
||||||
|
// because the sending side was dropped. Exit now.
|
||||||
|
None => break,
|
||||||
|
},
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {
|
||||||
|
if *stop_signal.borrow() {
|
||||||
|
// Nothing has been going on for 10 secs, and we are shutting
|
||||||
|
// down. Exit now.
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// Nothing is going on but we don't want to exit.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if cancellable && *stop_signal.borrow() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Err(e) = job.await {
|
||||||
|
error!("Job failed: {}", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Err(e) = job.await {
|
info!("Background worker {} exiting", i);
|
||||||
error!("Job failed: {}", e)
|
}))
|
||||||
}
|
.unwrap();
|
||||||
}
|
|
||||||
info!("Worker {} exiting", i);
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Arc::new(Self {
|
let bgrunner = Arc::new(Self {
|
||||||
stop_signal,
|
stop_signal,
|
||||||
queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))),
|
queue_in,
|
||||||
workers: Mutex::new(workers),
|
worker_in,
|
||||||
})
|
});
|
||||||
}
|
(bgrunner, await_all_done)
|
||||||
|
|
||||||
pub async fn run(self: Arc<Self>) {
|
|
||||||
let mut stop_signal = self.stop_signal.clone();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let exit_now = match stop_signal.changed().await {
|
|
||||||
Ok(()) => *stop_signal.borrow(),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Watch .changed() error: {}", e);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if exit_now {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Closing background job queue_in...");
|
|
||||||
drop(self.queue_in.swap(None));
|
|
||||||
|
|
||||||
info!("Waiting for all workers to terminate...");
|
|
||||||
while let Some(task) = self.workers.lock().unwrap().pop() {
|
|
||||||
if let Err(e) = task.await {
|
|
||||||
warn!("Error awaiting task: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawn a task to be run in background
|
// Spawn a task to be run in background
|
||||||
pub async fn spawn<T>(&self, job: T)
|
pub fn spawn<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
{
|
{
|
||||||
match self.queue_in.load().as_ref() {
|
let boxed: Job = Box::pin(job);
|
||||||
Some(chan) => {
|
self.queue_in
|
||||||
let boxed: Job = Box::pin(job);
|
.send((boxed, false))
|
||||||
chan.send((boxed, false)).map_err(|_| "send error").unwrap();
|
.map_err(|_| "could not put job in queue")
|
||||||
}
|
.unwrap();
|
||||||
None => {
|
|
||||||
warn!("Doing background job now because we are exiting...");
|
|
||||||
if let Err(e) = job.await {
|
|
||||||
warn!("Task failed: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_cancellable<T>(&self, job: T)
|
pub fn spawn_cancellable<T>(&self, job: T)
|
||||||
where
|
where
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
{
|
{
|
||||||
match self.queue_in.load().as_ref() {
|
let boxed: Job = Box::pin(job);
|
||||||
Some(chan) => {
|
self.queue_in
|
||||||
let boxed: Job = Box::pin(job);
|
.send((boxed, true))
|
||||||
chan.send((boxed, false)).map_err(|_| "send error").unwrap();
|
.map_err(|_| "could not put job in queue")
|
||||||
}
|
.unwrap();
|
||||||
None => (), // drop job if we are exiting
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
|
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
|
||||||
@ -114,11 +131,19 @@ impl BackgroundRunner {
|
|||||||
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
|
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
|
||||||
T: Future<Output = ()> + Send + 'static,
|
T: Future<Output = ()> + Send + 'static,
|
||||||
{
|
{
|
||||||
let mut workers = self.workers.lock().unwrap();
|
|
||||||
let stop_signal = self.stop_signal.clone();
|
let stop_signal = self.stop_signal.clone();
|
||||||
workers.push(tokio::spawn(async move {
|
let task = tokio::spawn(async move {
|
||||||
|
info!("Worker started: {}", name);
|
||||||
worker(stop_signal).await;
|
worker(stop_signal).await;
|
||||||
info!("Worker exited: {}", name);
|
info!("Worker exited: {}", name);
|
||||||
}));
|
});
|
||||||
|
self.worker_in
|
||||||
|
.send(task)
|
||||||
|
.map_err(|_| "could not put job in queue")
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
|
||||||
|
q.lock().await.recv().await
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user