Make TaskStack cancel each task individually, not all at once

This commit is contained in:
Brian Picciano 2023-06-21 14:29:47 +02:00
parent 5e89d21ce3
commit d0f601a9f7
3 changed files with 20 additions and 15 deletions

View File

@ -190,7 +190,7 @@ pub fn new(
acme_manager, acme_manager,
}); });
task_stack.spawn(|canceller| { task_stack.push_spawn(|canceller| {
let manager = manager.clone(); let manager = manager.clone();
async move { Ok(sync_origins(manager.origin_store.as_ref(), canceller).await) } async move { Ok(sync_origins(manager.origin_store.as_ref(), canceller).await) }
}); });

View File

@ -43,7 +43,7 @@ pub fn new(
handlebars: tpl::get(), handlebars: tpl::get(),
}); });
task_stack.spawn(|canceller| { task_stack.push_spawn(|canceller| {
tasks::listen_http( tasks::listen_http(
service.clone(), service.clone(),
canceller, canceller,
@ -53,7 +53,7 @@ pub fn new(
}); });
if let Some(https_params) = https_params { if let Some(https_params) = https_params {
task_stack.spawn(|canceller| { task_stack.push_spawn(|canceller| {
tasks::listen_https( tasks::listen_https(
service.clone(), service.clone(),
canceller, canceller,
@ -63,7 +63,7 @@ pub fn new(
) )
}); });
task_stack.spawn(|canceller| { task_stack.push_spawn(|canceller| {
tasks::cert_refresher(domain_manager.clone(), canceller, http_domain.clone()) tasks::cert_refresher(domain_manager.clone(), canceller, http_domain.clone())
}); });
} }

View File

@ -12,12 +12,19 @@ pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
} }
} }
pub struct TaskStack<E> struct Task<E>
where where
E: error::Error + Send + 'static, E: error::Error + Send + 'static,
{ {
canceller: CancellationToken, canceller: CancellationToken,
wait_group: Vec<tokio::task::JoinHandle<Result<(), E>>>, handle: tokio::task::JoinHandle<Result<(), E>>,
}
pub struct TaskStack<E>
where
E: error::Error + Send + 'static,
{
wait_group: Vec<Task<E>>,
} }
impl<E> TaskStack<E> impl<E> TaskStack<E>
@ -26,30 +33,28 @@ where
{ {
pub fn new() -> TaskStack<E> { pub fn new() -> TaskStack<E> {
TaskStack { TaskStack {
canceller: CancellationToken::new(),
wait_group: Vec::new(), wait_group: Vec::new(),
} }
} }
pub fn spawn<F, Fut>(&mut self, mut f: F) pub fn push_spawn<F, Fut>(&mut self, mut f: F)
where where
Fut: futures::Future<Output = Result<(), E>> + Send + 'static, Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
F: FnMut(CancellationToken) -> Fut, F: FnMut(CancellationToken) -> Fut,
{ {
let canceller = self.canceller.clone(); let canceller = CancellationToken::new();
let handle = tokio::spawn(f(canceller)); let handle = tokio::spawn(f(canceller.clone()));
self.wait_group.push(handle); self.wait_group.push(Task { canceller, handle });
} }
pub async fn stop(mut self) -> Result<(), E> { pub async fn stop(mut self) -> Result<(), E> {
self.canceller.cancel();
// reverse wait_group in place, so we stop the most recently added first. Since this method // reverse wait_group in place, so we stop the most recently added first. Since this method
// consumes self this is fine. // consumes self this is fine.
self.wait_group.reverse(); self.wait_group.reverse();
for f in self.wait_group { for t in self.wait_group {
if let Err(err) = f.await.expect("task failed") { t.canceller.cancel();
if let Err(err) = t.handle.await.expect("task failed") {
return Err(err); return Err(err);
} }
} }