diff --git a/src/util.rs b/src/util.rs index 29ef7b0..d15624c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,4 @@ -use std::{error, fs, io, path}; +use std::{error, fs, io, path, pin}; use tokio_util::sync::CancellationToken; @@ -12,19 +12,13 @@ pub fn open_file(path: &path::Path) -> io::Result> { } } -struct Task -where - E: error::Error + Send + 'static, -{ - canceller: CancellationToken, - handle: tokio::task::JoinHandle>, -} +type StaticFuture = pin::Pin + Send + 'static>>; pub struct TaskStack where E: error::Error + Send + 'static, { - wait_group: Vec>, + wait_group: Vec>>, } impl TaskStack @@ -37,6 +31,16 @@ where } } + /// push adds the given Future to the stack, to be executed once stop is called. + pub fn push(&mut self, f: Fut) + where + Fut: futures::Future> + Send + 'static, + { + self.wait_group.push(Box::pin(f)) + } + + /// push_spawn will spawn the given closure in a tokio task. Once the CancellationToken is + /// cancelled the closure is expected to return. pub fn push_spawn(&mut self, mut f: F) where Fut: futures::Future> + Send + 'static, @@ -44,17 +48,21 @@ where { let canceller = CancellationToken::new(); let handle = tokio::spawn(f(canceller.clone())); - self.wait_group.push(Task { canceller, handle }); + self.push(async move { + canceller.cancel(); + handle.await.expect("failed to join task") + }); } + /// stop will process all operations which have been pushed onto the stack in the reverse order + /// they were pushed. pub async fn stop(mut self) -> Result<(), E> { // reverse wait_group in place, so we stop the most recently added first. Since this method // consumes self this is fine. self.wait_group.reverse(); - for t in self.wait_group { - t.canceller.cancel(); - if let Err(err) = t.handle.await.expect("task failed") { + for fut in self.wait_group { + if let Err(err) = fut.await { return Err(err); } }