Move origin syncing to within manager

This commit is contained in:
Brian Picciano 2023-05-17 14:37:23 +02:00
parent 7daa864739
commit a7e74ac5dd
2 changed files with 66 additions and 54 deletions

View File

@ -117,6 +117,7 @@ impl From<config::SetError> for SyncWithConfigError {
#[mockall::automock( #[mockall::automock(
type Origin=origin::MockOrigin; type Origin=origin::MockOrigin;
type SyncWithConfigFuture=future::Ready<Result<(), SyncWithConfigError>>; type SyncWithConfigFuture=future::Ready<Result<(), SyncWithConfigError>>;
type SyncAllOriginsErrorsIter=Vec<(Option<origin::Descr>,error::Unexpected)>;
)] )]
pub trait Manager { pub trait Manager {
type Origin<'mgr>: origin::Origin + 'mgr type Origin<'mgr>: origin::Origin + 'mgr
@ -130,14 +131,22 @@ pub trait Manager {
where where
Self: 'mgr; Self: 'mgr;
type SyncAllOriginsErrorsIter<'mgr>: IntoIterator<Item = (Option<origin::Descr>, error::Unexpected)>
+ 'mgr
where
Self: 'mgr;
fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError>; fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError>;
fn get_origin(&self, domain: &domain::Name) -> Result<Self::Origin<'_>, GetOriginError>; fn get_origin(&self, domain: &domain::Name) -> Result<Self::Origin<'_>, GetOriginError>;
fn sync(&self, domain: &domain::Name) -> Result<(), SyncError>;
fn sync_with_config( fn sync_with_config(
&self, &self,
domain: domain::Name, domain: domain::Name,
config: config::Config, config: config::Config,
) -> Self::SyncWithConfigFuture<'_>; ) -> Self::SyncWithConfigFuture<'_>;
fn sync_all_origins(&self) -> Result<Self::SyncAllOriginsErrorsIter<'_>, error::Unexpected>;
} }
pub trait BoxedManager: Manager + Send + Sync + Clone {} pub trait BoxedManager: Manager + Send + Sync + Clone {}
@ -188,6 +197,9 @@ where
type SyncWithConfigFuture<'mgr> = pin::Pin<Box<dyn future::Future<Output = Result<(), SyncWithConfigError>> + Send + 'mgr>> type SyncWithConfigFuture<'mgr> = pin::Pin<Box<dyn future::Future<Output = Result<(), SyncWithConfigError>> + Send + 'mgr>>
where Self: 'mgr; where Self: 'mgr;
type SyncAllOriginsErrorsIter<'mgr> = Box<dyn Iterator<Item = (Option<origin::Descr>, error::Unexpected)> + 'mgr>
where Self: 'mgr;
fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError> { fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError> {
Ok(self.domain_config_store.get(domain)?) Ok(self.domain_config_store.get(domain)?)
} }
@ -202,17 +214,6 @@ where
Ok(origin) Ok(origin)
} }
fn sync(&self, domain: &domain::Name) -> Result<(), SyncError> {
let config = self.domain_config_store.get(domain)?;
self.origin_store
.sync(config.origin_descr, origin::store::Limits {})
.map_err(|e| match e {
origin::store::SyncError::AlreadyInProgress => SyncError::AlreadyInProgress,
_ => e.to_unexpected().into(),
})?;
Ok(())
}
fn sync_with_config( fn sync_with_config(
&self, &self,
domain: domain::Name, domain: domain::Name,
@ -233,4 +234,25 @@ where
Ok(()) Ok(())
}) })
} }
fn sync_all_origins(&self) -> Result<Self::SyncAllOriginsErrorsIter<'_>, error::Unexpected> {
let iter = self.origin_store.all_descrs().map_unexpected()?.into_iter();
Ok(Box::from(iter.filter_map(|descr| {
if let Err(err) = descr {
return Some((None, err.to_unexpected().into()));
}
let descr = descr.unwrap();
if let Err(err) = self
.origin_store
.sync(descr.clone(), origin::store::Limits {})
{
return Some((Some(descr), err.to_unexpected().into()));
}
None
})))
}
} }

View File

@ -10,7 +10,7 @@ use std::path;
use std::str::FromStr; use std::str::FromStr;
use std::sync; use std::sync;
use domiply::origin::store::Store; use domiply::domain::manager::Manager;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version)] #[command(version)]
@ -71,47 +71,6 @@ fn main() {
let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path) let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path)
.expect("git origin store initialized"); .expect("git origin store initialized");
let origin_syncer_handler = {
let origin_store = origin_store.clone();
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(20 * 60));
interval.tick().await;
loop {
origin_store
.all_descrs()
.expect("got all_descrs iter")
.into_iter()
.for_each(|descr| {
if canceller.is_cancelled() {
return;
}
if let Err(err) = descr {
println!("failed iterating origins: {err}");
return;
}
let descr = descr.unwrap();
println!("syncing origin: {descr:?}");
if let Err(err) =
origin_store.sync(descr.clone(), domiply::origin::store::Limits {})
{
println!("error syncing origin {descr:?}: {err}");
}
});
select! {
_ = interval.tick() => continue,
_ = canceller.cancelled() => return,
}
}
})
};
let domain_checker = domiply::domain::checker::new( let domain_checker = domiply::domain::checker::new(
tokio_runtime.clone(), tokio_runtime.clone(),
config.domain_checker_target_a, config.domain_checker_target_a,
@ -124,6 +83,37 @@ fn main() {
let manager = domiply::domain::manager::new(origin_store, domain_config_store, domain_checker); let manager = domiply::domain::manager::new(origin_store, domain_config_store, domain_checker);
let origin_syncer_handler = {
let manager = manager.clone();
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(20 * 60));
loop {
select! {
_ = interval.tick() => (),
_ = canceller.cancelled() => return,
}
let errors_iter = manager.sync_all_origins();
if let Err(err) = errors_iter {
println!("Got error calling sync_all_origins: {err}");
continue;
}
errors_iter
.unwrap()
.into_iter()
.for_each(|(descr, err)| match descr {
None => println!("error while syncing unknown descr: {err}"),
Some(descr) => println!("failed to sync {descr:?}: {err}"),
});
}
})
};
let service = domiply::service::new( let service = domiply::service::new(
manager, manager,
config.domain_checker_target_a, config.domain_checker_target_a,