Use TaskStack to clean up startup/shutdown logic significantly

main
Brian Picciano 1 year ago
parent f2374cded5
commit 7dd52839b1
  1. 1
      TODO
  2. 73
      src/domain/manager.rs
  3. 22
      src/error/unexpected.rs
  4. 68
      src/main.rs
  5. 22
      src/service/http.rs
  6. 26
      src/util.rs

@ -1,3 +1,2 @@
- make domain_manager implement rusttls cert resolver - make domain_manager implement rusttls cert resolver
- Try to switch from Arc to Box where possible - Try to switch from Arc to Box where possible
- maybe build TaskSet into some kind of defer-like replacement

@ -1,6 +1,7 @@
use crate::domain::{self, acme, checker, config}; use crate::domain::{self, acme, checker, config};
use crate::error::unexpected::{self, Mappable}; use crate::error::unexpected::{self, Mappable};
use crate::origin; use crate::origin;
use crate::util;
use std::{future, pin, sync}; use std::{future, pin, sync};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -144,70 +145,56 @@ pub trait Manager: Sync + Send {
fn all_domains(&self) -> Result<Vec<domain::Name>, unexpected::Error>; fn all_domains(&self) -> Result<Vec<domain::Name>, unexpected::Error>;
} }
pub struct ManagerImpl { struct ManagerImpl {
origin_store: sync::Arc<dyn origin::store::Store>, origin_store: sync::Arc<dyn origin::store::Store>,
domain_config_store: sync::Arc<dyn config::Store>, domain_config_store: sync::Arc<dyn config::Store>,
domain_checker: checker::DNSChecker, domain_checker: checker::DNSChecker,
acme_manager: Option<sync::Arc<dyn acme::manager::Manager>>, acme_manager: Option<sync::Arc<dyn acme::manager::Manager>>,
canceller: CancellationToken,
origin_sync_handler: tokio::task::JoinHandle<()>,
} }
fn sync_origins(origin_store: &dyn origin::store::Store) { async fn sync_origins(origin_store: &dyn origin::store::Store, canceller: CancellationToken) {
match origin_store.all_descrs() { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
Ok(iter) => iter.into_iter(), loop {
Err(err) => { tokio::select! {
log::error!("Error fetching origin descriptors: {err}"); _ = interval.tick() => {
return;
match origin_store.all_descrs() {
Ok(iter) => iter.into_iter(),
Err(err) => {
log::error!("Error fetching origin descriptors: {err}");
return;
}
} }
.for_each(|descr| {
if let Err(err) = origin_store.sync(descr.clone(), origin::store::Limits {}) {
log::error!("Failed to sync store for {:?}: {err}", descr);
return;
}
});
},
_ = canceller.cancelled() => return,
}
} }
.for_each(|descr| {
if let Err(err) = origin_store.sync(descr.clone(), origin::store::Limits {}) {
log::error!("Failed to sync store for {:?}: {err}", descr);
return;
}
});
} }
pub fn new( pub fn new(
task_stack: &mut util::TaskStack<unexpected::Error>,
origin_store: sync::Arc<dyn origin::store::Store>, origin_store: sync::Arc<dyn origin::store::Store>,
domain_config_store: sync::Arc<dyn config::Store>, domain_config_store: sync::Arc<dyn config::Store>,
domain_checker: checker::DNSChecker, domain_checker: checker::DNSChecker,
acme_manager: Option<sync::Arc<dyn acme::manager::Manager>>, acme_manager: Option<sync::Arc<dyn acme::manager::Manager>>,
) -> ManagerImpl { ) -> sync::Arc<dyn Manager> {
let canceller = CancellationToken::new(); task_stack.spawn(|canceller| {
let origin_sync_handler = {
let origin_store = origin_store.clone(); let origin_store = origin_store.clone();
let canceller = canceller.clone(); async move { Ok(sync_origins(origin_store.as_ref(), canceller).await) }
tokio::spawn(async move { });
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
loop {
tokio::select! {
_ = interval.tick() => sync_origins(origin_store.as_ref()),
_ = canceller.cancelled() => return,
}
}
})
};
ManagerImpl { sync::Arc::new(ManagerImpl {
origin_store, origin_store,
domain_config_store, domain_config_store,
domain_checker, domain_checker,
acme_manager, acme_manager,
canceller, })
origin_sync_handler,
}
}
impl ManagerImpl {
pub fn stop(self) -> tokio::task::JoinHandle<()> {
self.canceller.cancel();
self.origin_sync_handler
}
} }
impl Manager for ManagerImpl { impl Manager for ManagerImpl {

@ -103,6 +103,28 @@ impl<T, E: error::Error> Mappable<T> for Result<T, E> {
} }
} }
static OPTION_NONE_ERROR: &'static str = "expected Some but got None";
impl<T> Mappable<T> for Option<T> {
fn or_unexpected(self) -> Result<T, Error> {
self.ok_or(Error::from(OPTION_NONE_ERROR)).or_unexpected()
}
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T, Error> {
self.ok_or(Error::from(OPTION_NONE_ERROR))
.or_unexpected_while(prefix)
}
fn map_unexpected_while<F, D>(self, f: F) -> Result<T, Error>
where
F: FnOnce() -> D,
D: fmt::Display,
{
self.ok_or(Error::from(OPTION_NONE_ERROR))
.map_unexpected_while(f)
}
}
pub trait Intoable { pub trait Intoable {
fn into_unexpected(self) -> Error; fn into_unexpected(self) -> Error;

@ -78,27 +78,6 @@ async fn main() {
) )
.init(); .init();
let canceller = tokio_util::sync::CancellationToken::new();
{
let canceller = canceller.clone();
tokio::spawn(async move {
let mut signals = Signals::new(signal_hook::consts::TERM_SIGNALS)
.expect("initializing signals failed");
if (signals.next().await).is_some() {
log::info!("Gracefully shutting down...");
canceller.cancel();
}
if (signals.next().await).is_some() {
log::warn!("Forcefully shutting down");
std::process::exit(1);
};
});
}
let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path) let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path)
.expect("git origin store initialization failed"); .expect("git origin store initialization failed");
@ -138,38 +117,47 @@ async fn main() {
None None
}; };
let mut task_stack = domiply::util::TaskStack::new();
let domain_manager = domiply::domain::manager::new( let domain_manager = domiply::domain::manager::new(
&mut task_stack,
origin_store, origin_store,
domain_config_store, domain_config_store,
domain_checker, domain_checker,
https_params.as_ref().map(|p| p.domain_acme_manager.clone()), https_params.as_ref().map(|p| p.domain_acme_manager.clone()),
); );
let domain_manager = sync::Arc::new(domain_manager); let _ = domiply::service::http::new(
&mut task_stack,
{ domain_manager.clone(),
let (http_service, http_service_task_set) = domiply::service::http::new( config.domain_checker_target_a,
domain_manager.clone(), config.passphrase,
config.domain_checker_target_a, config.http_listen_addr.clone(),
config.passphrase, config.http_domain.clone(),
config.http_listen_addr.clone(), https_params.map(|p| domiply::service::http::HTTPSParams {
config.http_domain.clone(), listen_addr: p.https_listen_addr,
https_params.map(|p| domiply::service::http::HTTPSParams { cert_resolver: domiply::domain::acme::resolver::new(p.domain_acme_store),
listen_addr: p.https_listen_addr, }),
cert_resolver: domiply::domain::acme::resolver::new(p.domain_acme_store), );
}),
);
canceller.cancelled().await; let mut signals =
Signals::new(signal_hook::consts::TERM_SIGNALS).expect("initializing signals failed");
domiply::service::http::stop(http_service, http_service_task_set).await; if (signals.next().await).is_some() {
log::info!("Gracefully shutting down...");
} }
sync::Arc::into_inner(domain_manager) tokio::spawn(async move {
.unwrap() if (signals.next().await).is_some() {
log::warn!("Forcefully shutting down");
std::process::exit(1);
};
});
task_stack
.stop() .stop()
.await .await
.expect("domain manager failed to shutdown cleanly"); .expect("failed to stop all background tasks");
log::info!("Graceful shutdown complete"); log::info!("Graceful shutdown complete");
} }

@ -27,13 +27,14 @@ pub struct HTTPSParams {
} }
pub fn new( pub fn new(
task_stack: &mut util::TaskStack<unexpected::Error>,
domain_manager: sync::Arc<dyn domain::manager::Manager>, domain_manager: sync::Arc<dyn domain::manager::Manager>,
target_a: net::Ipv4Addr, target_a: net::Ipv4Addr,
passphrase: String, passphrase: String,
http_listen_addr: net::SocketAddr, http_listen_addr: net::SocketAddr,
http_domain: domain::Name, http_domain: domain::Name,
https_params: Option<HTTPSParams>, https_params: Option<HTTPSParams>,
) -> (sync::Arc<Service>, util::TaskSet<unexpected::Error>) { ) -> sync::Arc<Service> {
let service = sync::Arc::new(Service { let service = sync::Arc::new(Service {
domain_manager: domain_manager.clone(), domain_manager: domain_manager.clone(),
target_a, target_a,
@ -42,9 +43,7 @@ pub fn new(
handlebars: tpl::get(), handlebars: tpl::get(),
}); });
let task_set = util::TaskSet::new(); task_stack.spawn(|canceller| {
task_set.spawn(|canceller| {
tasks::listen_http( tasks::listen_http(
service.clone(), service.clone(),
canceller, canceller,
@ -54,7 +53,7 @@ pub fn new(
}); });
if let Some(https_params) = https_params { if let Some(https_params) = https_params {
task_set.spawn(|canceller| { task_stack.spawn(|canceller| {
tasks::listen_https( tasks::listen_https(
service.clone(), service.clone(),
canceller, canceller,
@ -64,21 +63,12 @@ pub fn new(
) )
}); });
task_set.spawn(|canceller| { task_stack.spawn(|canceller| {
tasks::cert_refresher(domain_manager.clone(), canceller, http_domain.clone()) tasks::cert_refresher(domain_manager.clone(), canceller, http_domain.clone())
}); });
} }
return (service, task_set); return service;
}
pub async fn stop(service: sync::Arc<Service>, task_set: util::TaskSet<unexpected::Error>) {
task_set
.stop()
.await
.iter()
.for_each(|e| log::error!("error while shutting down http service: {e}"));
sync::Arc::into_inner(service).expect("service didn't get cleaned up");
} }
#[derive(Serialize)] #[derive(Serialize)]

@ -1,6 +1,5 @@
use std::{error, fs, io, path}; use std::{error, fs, io, path};
use futures::stream::futures_unordered::FuturesUnordered;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> { pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
@ -13,26 +12,26 @@ pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
} }
} }
pub struct TaskSet<E> pub struct TaskStack<E>
where where
E: error::Error + Send + 'static, E: error::Error + Send + 'static,
{ {
canceller: CancellationToken, canceller: CancellationToken,
wait_group: FuturesUnordered<tokio::task::JoinHandle<Result<(), E>>>, wait_group: Vec<tokio::task::JoinHandle<Result<(), E>>>,
} }
impl<E> TaskSet<E> impl<E> TaskStack<E>
where where
E: error::Error + Send + 'static, E: error::Error + Send + 'static,
{ {
pub fn new() -> TaskSet<E> { pub fn new() -> TaskStack<E> {
TaskSet { TaskStack {
canceller: CancellationToken::new(), canceller: CancellationToken::new(),
wait_group: FuturesUnordered::new(), wait_group: Vec::new(),
} }
} }
pub fn spawn<F, Fut>(&self, mut f: F) pub fn spawn<F, Fut>(&mut self, mut f: F)
where where
Fut: futures::Future<Output = Result<(), E>> + Send + 'static, Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
F: FnMut(CancellationToken) -> Fut, F: FnMut(CancellationToken) -> Fut,
@ -42,15 +41,18 @@ where
self.wait_group.push(handle); self.wait_group.push(handle);
} }
pub async fn stop(self) -> Vec<E> { pub async fn stop(mut self) -> Result<(), E> {
self.canceller.cancel(); self.canceller.cancel();
let mut res = Vec::new(); // reverse wait_group in place, so we stop the most recently added first. Since this method
// consumes self this is fine.
self.wait_group.reverse();
for f in self.wait_group { for f in self.wait_group {
if let Err(err) = f.await.expect("task failed") { if let Err(err) = f.await.expect("task failed") {
res.push(err); return Err(err);
} }
} }
res Ok(())
} }
} }

Loading…
Cancel
Save