Refactor TaskStack some more, allow for deferred syncronous tasks

This commit is contained in:
Brian Picciano 2023-06-21 17:16:49 +02:00
parent d0f601a9f7
commit fcab32e0f7

View File

@ -1,4 +1,4 @@
use std::{error, fs, io, path}; use std::{error, fs, io, path, pin};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -12,19 +12,13 @@ pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
} }
} }
struct Task<E> type StaticFuture<O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'static>>;
where
E: error::Error + Send + 'static,
{
canceller: CancellationToken,
handle: tokio::task::JoinHandle<Result<(), E>>,
}
pub struct TaskStack<E> pub struct TaskStack<E>
where where
E: error::Error + Send + 'static, E: error::Error + Send + 'static,
{ {
wait_group: Vec<Task<E>>, wait_group: Vec<StaticFuture<Result<(), E>>>,
} }
impl<E> TaskStack<E> impl<E> TaskStack<E>
@ -37,6 +31,16 @@ where
} }
} }
/// push adds the given Future to the stack, to be executed once stop is called.
pub fn push<Fut>(&mut self, f: Fut)
where
Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
{
self.wait_group.push(Box::pin(f))
}
/// push_spawn will spawn the given closure in a tokio task. Once the CancellationToken is
/// cancelled the closure is expected to return.
pub fn push_spawn<F, Fut>(&mut self, mut f: F) pub fn push_spawn<F, Fut>(&mut self, mut f: F)
where where
Fut: futures::Future<Output = Result<(), E>> + Send + 'static, Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
@ -44,17 +48,21 @@ where
{ {
let canceller = CancellationToken::new(); let canceller = CancellationToken::new();
let handle = tokio::spawn(f(canceller.clone())); let handle = tokio::spawn(f(canceller.clone()));
self.wait_group.push(Task { canceller, handle }); self.push(async move {
canceller.cancel();
handle.await.expect("failed to join task")
});
} }
/// stop will process all operations which have been pushed onto the stack in the reverse order
/// they were pushed.
pub async fn stop(mut self) -> Result<(), E> { pub async fn stop(mut self) -> Result<(), E> {
// reverse wait_group in place, so we stop the most recently added first. Since this method // reverse wait_group in place, so we stop the most recently added first. Since this method
// consumes self this is fine. // consumes self this is fine.
self.wait_group.reverse(); self.wait_group.reverse();
for t in self.wait_group { for fut in self.wait_group {
t.canceller.cancel(); if let Err(err) = fut.await {
if let Err(err) = t.handle.await.expect("task failed") {
return Err(err); return Err(err);
} }
} }