From a7e74ac5dd3cc2a1e941321a04077242a8b80511 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Wed, 17 May 2023 14:37:23 +0200 Subject: [PATCH] Move origin syncing to within manager --- src/domain/manager.rs | 46 ++++++++++++++++++++------- src/main.rs | 74 +++++++++++++++++++------------------------ 2 files changed, 66 insertions(+), 54 deletions(-) diff --git a/src/domain/manager.rs b/src/domain/manager.rs index 65c589e..460ce1b 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -117,6 +117,7 @@ impl From for SyncWithConfigError { #[mockall::automock( type Origin=origin::MockOrigin; type SyncWithConfigFuture=future::Ready>; + type SyncAllOriginsErrorsIter=Vec<(Option,error::Unexpected)>; )] pub trait Manager { type Origin<'mgr>: origin::Origin + 'mgr @@ -130,14 +131,22 @@ pub trait Manager { where Self: 'mgr; + type SyncAllOriginsErrorsIter<'mgr>: IntoIterator, error::Unexpected)> + + 'mgr + where + Self: 'mgr; + fn get_config(&self, domain: &domain::Name) -> Result; + fn get_origin(&self, domain: &domain::Name) -> Result, GetOriginError>; - fn sync(&self, domain: &domain::Name) -> Result<(), SyncError>; + fn sync_with_config( &self, domain: domain::Name, config: config::Config, ) -> Self::SyncWithConfigFuture<'_>; + + fn sync_all_origins(&self) -> Result, error::Unexpected>; } pub trait BoxedManager: Manager + Send + Sync + Clone {} @@ -188,6 +197,9 @@ where type SyncWithConfigFuture<'mgr> = pin::Pin> + Send + 'mgr>> where Self: 'mgr; + type SyncAllOriginsErrorsIter<'mgr> = Box, error::Unexpected)> + 'mgr> + where Self: 'mgr; + fn get_config(&self, domain: &domain::Name) -> Result { Ok(self.domain_config_store.get(domain)?) } @@ -202,17 +214,6 @@ where Ok(origin) } - fn sync(&self, domain: &domain::Name) -> Result<(), SyncError> { - let config = self.domain_config_store.get(domain)?; - self.origin_store - .sync(config.origin_descr, origin::store::Limits {}) - .map_err(|e| match e { - origin::store::SyncError::AlreadyInProgress => SyncError::AlreadyInProgress, - _ => e.to_unexpected().into(), - })?; - Ok(()) - } - fn sync_with_config( &self, domain: domain::Name, @@ -233,4 +234,25 @@ where Ok(()) }) } + + fn sync_all_origins(&self) -> Result, error::Unexpected> { + let iter = self.origin_store.all_descrs().map_unexpected()?.into_iter(); + + Ok(Box::from(iter.filter_map(|descr| { + if let Err(err) = descr { + return Some((None, err.to_unexpected().into())); + } + + let descr = descr.unwrap(); + + if let Err(err) = self + .origin_store + .sync(descr.clone(), origin::store::Limits {}) + { + return Some((Some(descr), err.to_unexpected().into())); + } + + None + }))) + } } diff --git a/src/main.rs b/src/main.rs index c7c2873..5fad4ec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use std::path; use std::str::FromStr; use std::sync; -use domiply::origin::store::Store; +use domiply::domain::manager::Manager; #[derive(Parser, Debug)] #[command(version)] @@ -71,47 +71,6 @@ fn main() { let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path) .expect("git origin store initialized"); - let origin_syncer_handler = { - let origin_store = origin_store.clone(); - let canceller = canceller.clone(); - - tokio_runtime.spawn(async move { - let mut interval = time::interval(time::Duration::from_secs(20 * 60)); - interval.tick().await; - - loop { - origin_store - .all_descrs() - .expect("got all_descrs iter") - .into_iter() - .for_each(|descr| { - if canceller.is_cancelled() { - return; - } - - if let Err(err) = descr { - println!("failed iterating origins: {err}"); - return; - } - - let descr = descr.unwrap(); - - println!("syncing origin: {descr:?}"); - if let Err(err) = - origin_store.sync(descr.clone(), domiply::origin::store::Limits {}) - { - println!("error syncing origin {descr:?}: {err}"); - } - }); - - select! { - _ = interval.tick() => continue, - _ = canceller.cancelled() => return, - } - } - }) - }; - let domain_checker = domiply::domain::checker::new( tokio_runtime.clone(), config.domain_checker_target_a, @@ -124,6 +83,37 @@ fn main() { let manager = domiply::domain::manager::new(origin_store, domain_config_store, domain_checker); + let origin_syncer_handler = { + let manager = manager.clone(); + let canceller = canceller.clone(); + + tokio_runtime.spawn(async move { + let mut interval = time::interval(time::Duration::from_secs(20 * 60)); + + loop { + select! { + _ = interval.tick() => (), + _ = canceller.cancelled() => return, + } + + let errors_iter = manager.sync_all_origins(); + + if let Err(err) = errors_iter { + println!("Got error calling sync_all_origins: {err}"); + continue; + } + + errors_iter + .unwrap() + .into_iter() + .for_each(|(descr, err)| match descr { + None => println!("error while syncing unknown descr: {err}"), + Some(descr) => println!("failed to sync {descr:?}: {err}"), + }); + } + }) + }; + let service = domiply::service::new( manager, config.domain_checker_target_a,