add task_stack.push_spawn_periodically, use for domain manager sync job

This commit is contained in:
Brian Picciano 2024-01-14 14:05:09 +01:00
parent dbb8e442c8
commit 70c78b823a
2 changed files with 33 additions and 20 deletions

View File

@ -3,7 +3,6 @@ use crate::error::unexpected::{self, Mappable};
use crate::{origin, task_stack, util}; use crate::{origin, task_stack, util};
use std::sync; use std::sync;
use tokio_util::sync::CancellationToken;
pub enum GetSettingsResult { pub enum GetSettingsResult {
Stored(domain::Settings), Stored(domain::Settings),
@ -183,13 +182,11 @@ impl ManagerImpl {
config, config,
}); });
task_stack.push_spawn(|canceller| { task_stack.push_spawn_periodically(
let manager = manager.clone(); manager.clone(),
async move { 20 * 60,
manager.sync_all_domains_job(canceller).await; |_canceller, manager| async move { manager.sync_all_domains().await },
Ok(()) );
}
});
manager manager
} }
@ -301,18 +298,6 @@ impl ManagerImpl {
Ok(()) Ok(())
} }
async fn sync_all_domains_job(&self, canceller: CancellationToken) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
loop {
tokio::select! {
_ = canceller.cancelled() => return,
_ = interval.tick() => if let Err(err) = self.sync_all_domains().await {
log::error!("Failed to sync all domains: {err}")
},
}
}
}
} }
impl Manager for ManagerImpl { impl Manager for ManagerImpl {

View File

@ -42,6 +42,34 @@ where
}); });
} }
pub fn push_spawn_periodically<State, F, Fut>(&mut self, state: State, period_secs: u64, f: F)
where
State: Clone + Send + Sync + 'static,
Fut: futures::Future<Output = Result<(), E>> + Send + 'static,
F: Fn(CancellationToken, State) -> Fut + Send + Sync + 'static,
{
let canceller = CancellationToken::new();
let handle = {
let canceller = canceller.clone();
tokio::spawn(async move {
let mut interval =
tokio::time::interval(tokio::time::Duration::from_secs(period_secs));
loop {
tokio::select! {
_ = canceller.cancelled() => return Ok(()),
_ = interval.tick() => if let Err(err) = f(canceller.clone(), state.clone()).await {
log::error!("Failed to sync all domains: {err}")
},
}
}
})
};
self.push(async move {
canceller.cancel();
handle.await.expect("failed to join task")
});
}
/// stop will process all operations which have been pushed onto the stack in the reverse order /// stop will process all operations which have been pushed onto the stack in the reverse order
/// they were pushed. /// they were pushed.
pub async fn stop(mut self) -> Result<(), E> { pub async fn stop(mut self) -> Result<(), E> {