From d0f601a9f787637b5d0e9f070e09072c1b0af617 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Wed, 21 Jun 2023 14:29:47 +0200 Subject: [PATCH] Make TaskStack cancel each task individually, not all at once --- src/domain/manager.rs | 2 +- src/service/http.rs | 6 +++--- src/util.rs | 27 ++++++++++++++++----------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/domain/manager.rs b/src/domain/manager.rs index 54915a5..277dcf6 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -190,7 +190,7 @@ pub fn new( acme_manager, }); - task_stack.spawn(|canceller| { + task_stack.push_spawn(|canceller| { let manager = manager.clone(); async move { Ok(sync_origins(manager.origin_store.as_ref(), canceller).await) } }); diff --git a/src/service/http.rs b/src/service/http.rs index fadcce0..c8b1937 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -43,7 +43,7 @@ pub fn new( handlebars: tpl::get(), }); - task_stack.spawn(|canceller| { + task_stack.push_spawn(|canceller| { tasks::listen_http( service.clone(), canceller, @@ -53,7 +53,7 @@ pub fn new( }); if let Some(https_params) = https_params { - task_stack.spawn(|canceller| { + task_stack.push_spawn(|canceller| { tasks::listen_https( service.clone(), canceller, @@ -63,7 +63,7 @@ pub fn new( ) }); - task_stack.spawn(|canceller| { + task_stack.push_spawn(|canceller| { tasks::cert_refresher(domain_manager.clone(), canceller, http_domain.clone()) }); } diff --git a/src/util.rs b/src/util.rs index 3efb067..29ef7b0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,12 +12,19 @@ pub fn open_file(path: &path::Path) -> io::Result> { } } -pub struct TaskStack +struct Task where E: error::Error + Send + 'static, { canceller: CancellationToken, - wait_group: Vec>>, + handle: tokio::task::JoinHandle>, +} + +pub struct TaskStack +where + E: error::Error + Send + 'static, +{ + wait_group: Vec>, } impl TaskStack @@ -26,30 +33,28 @@ where { pub fn new() -> TaskStack { TaskStack { - canceller: CancellationToken::new(), wait_group: Vec::new(), } } - pub fn spawn(&mut self, mut f: F) + pub fn push_spawn(&mut self, mut f: F) where Fut: futures::Future> + Send + 'static, F: FnMut(CancellationToken) -> Fut, { - let canceller = self.canceller.clone(); - let handle = tokio::spawn(f(canceller)); - self.wait_group.push(handle); + let canceller = CancellationToken::new(); + let handle = tokio::spawn(f(canceller.clone())); + self.wait_group.push(Task { canceller, handle }); } pub async fn stop(mut self) -> Result<(), E> { - self.canceller.cancel(); - // reverse wait_group in place, so we stop the most recently added first. Since this method // consumes self this is fine. self.wait_group.reverse(); - for f in self.wait_group { - if let Err(err) = f.await.expect("task failed") { + for t in self.wait_group { + t.canceller.cancel(); + if let Err(err) = t.handle.await.expect("task failed") { return Err(err); } }