2023-07-21 12:43:39 +00:00
|
|
|
use crate::util::BoxFuture;
|
|
|
|
use std::error;
|
|
|
|
use tokio_util::sync::CancellationToken;
|
|
|
|
|
|
|
|
pub struct TaskStack<E>
|
|
|
|
where
|
|
|
|
E: error::Error + Send + 'static,
|
|
|
|
{
|
|
|
|
wait_group: Vec<BoxFuture<'static, Result<(), E>>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<E> TaskStack<E>
|
|
|
|
where
|
|
|
|
E: error::Error + Send + 'static,
|
|
|
|
{
|
|
|
|
pub fn new() -> TaskStack<E> {
|
|
|
|
TaskStack {
|
|
|
|
wait_group: Vec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// push adds the given Future to the stack, to be executed once stop is called.
|
|
|
|
pub fn push<Fut>(&mut self, f: Fut)
|
|
|
|
where
|
|
|
|
Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
|
|
|
|
{
|
|
|
|
self.wait_group.push(Box::pin(f))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// push_spawn will spawn the given closure in a tokio task. Once the CancellationToken is
|
|
|
|
/// cancelled the closure is expected to return.
|
|
|
|
pub fn push_spawn<F, Fut>(&mut self, mut f: F)
|
|
|
|
where
|
|
|
|
Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
|
|
|
|
F: FnMut(CancellationToken) -> Fut,
|
|
|
|
{
|
|
|
|
let canceller = CancellationToken::new();
|
|
|
|
let handle = tokio::spawn(f(canceller.clone()));
|
|
|
|
self.push(async move {
|
|
|
|
canceller.cancel();
|
|
|
|
handle.await.expect("failed to join task")
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2024-01-14 13:05:09 +00:00
|
|
|
pub fn push_spawn_periodically<State, F, Fut>(&mut self, state: State, period_secs: u64, f: F)
|
|
|
|
where
|
|
|
|
State: Clone + Send + Sync + 'static,
|
|
|
|
Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
|
|
|
|
F: Fn(CancellationToken, State) -> Fut + Send + Sync + 'static,
|
|
|
|
{
|
|
|
|
let canceller = CancellationToken::new();
|
|
|
|
let handle = {
|
|
|
|
let canceller = canceller.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut interval =
|
|
|
|
tokio::time::interval(tokio::time::Duration::from_secs(period_secs));
|
|
|
|
loop {
|
|
|
|
tokio::select! {
|
|
|
|
_ = canceller.cancelled() => return Ok(()),
|
|
|
|
_ = interval.tick() => if let Err(err) = f(canceller.clone(), state.clone()).await {
|
|
|
|
log::error!("Failed to sync all domains: {err}")
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
};
|
|
|
|
self.push(async move {
|
|
|
|
canceller.cancel();
|
|
|
|
handle.await.expect("failed to join task")
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2023-07-21 12:43:39 +00:00
|
|
|
/// stop will process all operations which have been pushed onto the stack in the reverse order
|
|
|
|
/// they were pushed.
|
|
|
|
pub async fn stop(mut self) -> Result<(), E> {
|
|
|
|
// reverse wait_group in place, so we stop the most recently added first. Since this method
|
|
|
|
// consumes self this is fine.
|
|
|
|
self.wait_group.reverse();
|
|
|
|
|
|
|
|
for fut in self.wait_group {
|
|
|
|
fut.await?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<E> Default for TaskStack<E>
|
|
|
|
where
|
|
|
|
E: error::Error + Send + 'static,
|
|
|
|
{
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new()
|
|
|
|
}
|
|
|
|
}
|