From 70c78b823a7d320eb8acbfffdc86c789461122c5 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sun, 14 Jan 2024 14:05:09 +0100 Subject: [PATCH] add task_stack.push_spawn_periodically, use for domain manager sync job --- src/domain/manager.rs | 25 +++++-------------------- src/task_stack.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/domain/manager.rs b/src/domain/manager.rs index 1334913..8e92128 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -3,7 +3,6 @@ use crate::error::unexpected::{self, Mappable}; use crate::{origin, task_stack, util}; use std::sync; -use tokio_util::sync::CancellationToken; pub enum GetSettingsResult { Stored(domain::Settings), @@ -183,13 +182,11 @@ impl ManagerImpl { config, }); - task_stack.push_spawn(|canceller| { - let manager = manager.clone(); - async move { - manager.sync_all_domains_job(canceller).await; - Ok(()) - } - }); + task_stack.push_spawn_periodically( + manager.clone(), + 20 * 60, + |_canceller, manager| async move { manager.sync_all_domains().await }, + ); manager } @@ -301,18 +298,6 @@ impl ManagerImpl { Ok(()) } - - async fn sync_all_domains_job(&self, canceller: CancellationToken) { - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60)); - loop { - tokio::select! { - _ = canceller.cancelled() => return, - _ = interval.tick() => if let Err(err) = self.sync_all_domains().await { - log::error!("Failed to sync all domains: {err}") - }, - } - } - } } impl Manager for ManagerImpl { diff --git a/src/task_stack.rs b/src/task_stack.rs index 2dc44f2..374da54 100644 --- a/src/task_stack.rs +++ b/src/task_stack.rs @@ -42,6 +42,34 @@ where }); } + pub fn push_spawn_periodically(&mut self, state: State, period_secs: u64, f: F) + where + State: Clone + Send + Sync + 'static, + Fut: futures::Future> + Send + 'static, + F: Fn(CancellationToken, State) -> Fut + Send + Sync + 'static, + { + let canceller = CancellationToken::new(); + let handle = { + let canceller = canceller.clone(); + tokio::spawn(async move { + let mut interval = + tokio::time::interval(tokio::time::Duration::from_secs(period_secs)); + loop { + tokio::select! { + _ = canceller.cancelled() => return Ok(()), + _ = interval.tick() => if let Err(err) = f(canceller.clone(), state.clone()).await { + log::error!("Failed to sync all domains: {err}") + }, + } + } + }) + }; + self.push(async move { + canceller.cancel(); + handle.await.expect("failed to join task") + }); + } + /// stop will process all operations which have been pushed onto the stack in the reverse order /// they were pushed. pub async fn stop(mut self) -> Result<(), E> {