Get rid of origin::store::BoxedManager and domain::manager::BoxedManager

This commit is contained in:
Brian Picciano 2023-06-17 18:13:02 +02:00
parent 4317d7f282
commit 52f87dc625
6 changed files with 98 additions and 178 deletions

View File

@ -48,7 +48,7 @@ pub trait Store {
fn all_domains(&self) -> AllDomainsResult<Vec<AllDomainsResult<domain::Name>>>; fn all_domains(&self) -> AllDomainsResult<Vec<AllDomainsResult<domain::Name>>>;
} }
pub trait BoxedStore: Store + Send + Sync + Clone {} pub trait BoxedStore: Store + Send + Sync + Clone + 'static {}
struct FSStore { struct FSStore {
dir_path: PathBuf, dir_path: PathBuf,

View File

@ -1,8 +1,9 @@
use crate::domain::{self, acme, checker, config}; use crate::domain::{self, acme, checker, config};
use crate::error::unexpected::{self, Intoable, Mappable}; use crate::error::unexpected::{self, Mappable};
use crate::origin; use crate::origin;
use std::{future, pin, sync}; use std::{future, pin, sync};
use tokio_util::sync::CancellationToken;
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum GetConfigError { pub enum GetConfigError {
@ -117,38 +118,20 @@ pub type GetAcmeHttp01ChallengeKeyError = acme::manager::GetHttp01ChallengeKeyEr
pub type AllDomainsResult<T> = config::AllDomainsResult<T>; pub type AllDomainsResult<T> = config::AllDomainsResult<T>;
#[mockall::automock( #[mockall::automock]
type Origin=origin::MockOrigin; pub trait Manager: Sync + Send {
type SyncWithConfigFuture=future::Ready<Result<(), SyncWithConfigError>>;
type SyncAllOriginsErrorsIter=Vec<unexpected::Error>;
)]
pub trait Manager {
type Origin<'mgr>: origin::Origin + 'mgr
where
Self: 'mgr;
type SyncWithConfigFuture<'mgr>: future::Future<Output = Result<(), SyncWithConfigError>>
+ Send
+ Unpin
+ 'mgr
where
Self: 'mgr;
type SyncAllOriginsErrorsIter<'mgr>: IntoIterator<Item = unexpected::Error> + 'mgr
where
Self: 'mgr;
fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError>; fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError>;
fn get_origin(&self, domain: &domain::Name) -> Result<Self::Origin<'_>, GetOriginError>; fn get_origin(
fn sync_with_config(
&self, &self,
domain: &domain::Name,
) -> Result<sync::Arc<dyn origin::Origin>, GetOriginError>;
fn sync_with_config<'mgr>(
&'mgr self,
domain: domain::Name, domain: domain::Name,
config: config::Config, config: config::Config,
) -> Self::SyncWithConfigFuture<'_>; ) -> pin::Pin<Box<dyn future::Future<Output = Result<(), SyncWithConfigError>> + Send + 'mgr>>;
fn sync_all_origins(&self) -> Result<Self::SyncAllOriginsErrorsIter<'_>, unexpected::Error>;
fn get_acme_http01_challenge_key( fn get_acme_http01_challenge_key(
&self, &self,
@ -158,69 +141,99 @@ pub trait Manager {
fn all_domains(&self) -> AllDomainsResult<Vec<AllDomainsResult<domain::Name>>>; fn all_domains(&self) -> AllDomainsResult<Vec<AllDomainsResult<domain::Name>>>;
} }
pub trait BoxedManager: Manager + Send + Sync + Clone {} struct ManagerImpl<DomainConfigStore, AcmeManager>
struct ManagerImpl<OriginStore, DomainConfigStore, AcmeManager>
where where
OriginStore: origin::store::BoxedStore,
DomainConfigStore: config::BoxedStore, DomainConfigStore: config::BoxedStore,
AcmeManager: acme::manager::BoxedManager, AcmeManager: acme::manager::BoxedManager,
{ {
origin_store: OriginStore, origin_store: sync::Arc<dyn origin::store::Store>,
domain_config_store: DomainConfigStore, domain_config_store: DomainConfigStore,
domain_checker: checker::DNSChecker, domain_checker: checker::DNSChecker,
acme_manager: Option<AcmeManager>, acme_manager: Option<AcmeManager>,
canceller: CancellationToken,
origin_sync_handler: tokio::task::JoinHandle<()>,
} }
pub fn new<OriginStore, DomainConfigStore, AcmeManager>( fn sync_origins(origin_store: &dyn origin::store::Store) {
origin_store: OriginStore, match origin_store.all_descrs() {
Ok(iter) => iter.into_iter(),
Err(err) => {
log::error!("Error fetching origin descriptors: {err}");
return;
}
}
.for_each(|descr| {
if let Err(err) = origin_store.sync(descr.clone(), origin::store::Limits {}) {
log::error!("Failed to sync store for {:?}: {err}", descr);
return;
}
});
}
pub fn new<'mgr, DomainConfigStore, AcmeManager>(
origin_store: sync::Arc<dyn origin::store::Store>,
domain_config_store: DomainConfigStore, domain_config_store: DomainConfigStore,
domain_checker: checker::DNSChecker, domain_checker: checker::DNSChecker,
acme_manager: Option<AcmeManager>, acme_manager: Option<AcmeManager>,
) -> impl BoxedManager ) -> sync::Arc<dyn Manager>
where where
OriginStore: origin::store::BoxedStore,
DomainConfigStore: config::BoxedStore, DomainConfigStore: config::BoxedStore,
AcmeManager: acme::manager::BoxedManager, AcmeManager: acme::manager::BoxedManager,
{ {
let canceller = CancellationToken::new();
let origin_sync_handler = {
let origin_store = origin_store.clone();
let canceller = canceller.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
loop {
tokio::select! {
_ = interval.tick() => sync_origins(origin_store.as_ref()),
_ = canceller.cancelled() => return,
}
}
})
};
sync::Arc::new(ManagerImpl { sync::Arc::new(ManagerImpl {
origin_store, origin_store,
domain_config_store, domain_config_store,
domain_checker, domain_checker,
acme_manager, acme_manager,
canceller,
origin_sync_handler,
}) })
} }
impl<OriginStore, DomainConfigStore, AcmeManager> BoxedManager impl<DomainConfigStore, AcmeManager> ManagerImpl<DomainConfigStore, AcmeManager>
for sync::Arc<ManagerImpl<OriginStore, DomainConfigStore, AcmeManager>>
where where
OriginStore: origin::store::BoxedStore,
DomainConfigStore: config::BoxedStore, DomainConfigStore: config::BoxedStore,
AcmeManager: acme::manager::BoxedManager, AcmeManager: acme::manager::BoxedManager,
{ {
pub async fn stop(self) {
self.canceller.cancel();
self.origin_sync_handler
.await
.expect("origin_sync_handler errored");
}
} }
impl<OriginStore, DomainConfigStore, AcmeManager> Manager impl<DomainConfigStore, AcmeManager> Manager for ManagerImpl<DomainConfigStore, AcmeManager>
for sync::Arc<ManagerImpl<OriginStore, DomainConfigStore, AcmeManager>>
where where
OriginStore: origin::store::BoxedStore,
DomainConfigStore: config::BoxedStore, DomainConfigStore: config::BoxedStore,
AcmeManager: acme::manager::BoxedManager, AcmeManager: acme::manager::BoxedManager,
{ {
type Origin<'mgr> = OriginStore::Origin<'mgr>
where Self: 'mgr;
type SyncWithConfigFuture<'mgr> = pin::Pin<Box<dyn future::Future<Output = Result<(), SyncWithConfigError>> + Send + 'mgr>>
where Self: 'mgr;
type SyncAllOriginsErrorsIter<'mgr> = Box<dyn Iterator<Item = unexpected::Error> + 'mgr>
where Self: 'mgr;
fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError> { fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError> {
Ok(self.domain_config_store.get(domain)?) Ok(self.domain_config_store.get(domain)?)
} }
fn get_origin(&self, domain: &domain::Name) -> Result<Self::Origin<'_>, GetOriginError> { fn get_origin(
&self,
domain: &domain::Name,
) -> Result<sync::Arc<dyn origin::Origin>, GetOriginError> {
let config = self.domain_config_store.get(domain)?; let config = self.domain_config_store.get(domain)?;
let origin = self let origin = self
.origin_store .origin_store
@ -230,11 +243,12 @@ where
Ok(origin) Ok(origin)
} }
fn sync_with_config( fn sync_with_config<'mgr>(
&self, &'mgr self,
domain: domain::Name, domain: domain::Name,
config: config::Config, config: config::Config,
) -> Self::SyncWithConfigFuture<'_> { ) -> pin::Pin<Box<dyn future::Future<Output = Result<(), SyncWithConfigError>> + Send + 'mgr>>
{
Box::pin(async move { Box::pin(async move {
let config_hash = config let config_hash = config
.hash() .hash()
@ -257,31 +271,6 @@ where
}) })
} }
fn sync_all_origins(&self) -> Result<Self::SyncAllOriginsErrorsIter<'_>, unexpected::Error> {
let iter = self
.origin_store
.all_descrs()
.or_unexpected_while("fetching all origin descrs")?
.into_iter();
Ok(Box::from(iter.filter_map(|descr| {
if let Err(err) = descr {
return Some(err.into_unexpected());
}
let descr = descr.unwrap();
if let Err(err) = self
.origin_store
.sync(descr.clone(), origin::store::Limits {})
{
return Some(err.into_unexpected_while(format!("syncing store {:?}", descr)));
}
None
})))
}
fn get_acme_http01_challenge_key( fn get_acme_http01_challenge_key(
&self, &self,
token: &str, token: &str,

View File

@ -159,34 +159,6 @@ async fn main() {
https_params.as_ref().map(|p| p.domain_acme_manager.clone()), https_params.as_ref().map(|p| p.domain_acme_manager.clone()),
); );
wait_group.push({
let domain_manager = domain_manager.clone();
let canceller = canceller.clone();
tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(20 * 60));
loop {
select! {
_ = interval.tick() => (),
_ = canceller.cancelled() => return,
}
let errors_iter = domain_manager.sync_all_origins();
if let Err(err) = errors_iter {
log::error!("Got error calling sync_all_origins: {err}");
continue;
}
errors_iter
.unwrap()
.into_iter()
.for_each(|err| log::error!("syncing failed: {err}"));
}
})
});
let service = domiply::service::new( let service = domiply::service::new(
domain_manager.clone(), domain_manager.clone(),
config.domain_checker_target_a, config.domain_checker_target_a,

View File

@ -1,5 +1,6 @@
use crate::error::unexpected; use crate::error::unexpected;
use crate::origin; use crate::origin;
use std::sync;
pub mod git; pub mod git;
@ -38,29 +39,13 @@ pub enum AllDescrsError {
Unexpected(#[from] unexpected::Error), Unexpected(#[from] unexpected::Error),
} }
/// Used in the return from all_descrs from Store. #[mockall::automock]
pub type AllDescrsResult<T> = Result<T, AllDescrsError>;
#[mockall::automock(
type Origin=origin::MockOrigin;
type AllDescrsIter=Vec<AllDescrsResult<origin::Descr>>;
)]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr. /// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store { pub trait Store: Sync + Send {
type Origin<'store>: origin::Origin + 'store
where
Self: 'store;
type AllDescrsIter<'store>: IntoIterator<Item = AllDescrsResult<origin::Descr>> + 'store
where
Self: 'store;
/// If the origin is of a kind which can be updated, sync will pull down the latest version of /// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage. /// the origin into the storage.
fn sync(&self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError>; fn sync(&self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError>;
fn get(&self, descr: origin::Descr) -> Result<Self::Origin<'_>, GetError>; fn get(&self, descr: origin::Descr) -> Result<sync::Arc<dyn origin::Origin>, GetError>;
fn all_descrs(&self) -> AllDescrsResult<Self::AllDescrsIter<'_>>; fn all_descrs(&self) -> Result<Vec<origin::Descr>, AllDescrsError>;
} }
pub trait BoxedStore: Store + Send + Sync + Clone {}

View File

@ -68,10 +68,10 @@ struct Store {
// more than one origin to be syncing at a time // more than one origin to be syncing at a time
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>, sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
origins: sync::RwLock<collections::HashMap<origin::Descr, Origin>>, origins: sync::RwLock<collections::HashMap<origin::Descr, sync::Arc<Origin>>>,
} }
pub fn new(dir_path: PathBuf) -> io::Result<impl super::BoxedStore> { pub fn new(dir_path: PathBuf) -> io::Result<sync::Arc<dyn super::Store>> {
fs::create_dir_all(&dir_path)?; fs::create_dir_all(&dir_path)?;
Ok(sync::Arc::new(Store { Ok(sync::Arc::new(Store {
dir_path, dir_path,
@ -208,15 +208,7 @@ impl Store {
} }
} }
impl super::BoxedStore for sync::Arc<Store> {} impl super::Store for Store {
impl super::Store for sync::Arc<Store> {
type Origin<'store> = Origin
where Self: 'store;
type AllDescrsIter<'store> = Box<dyn Iterator<Item = store::AllDescrsResult<origin::Descr>> + 'store>
where Self: 'store;
fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> { fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex // attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration. // isn't actually being held for the whole method duration.
@ -256,12 +248,12 @@ impl super::Store for sync::Arc<Store> {
})?; })?;
let mut origins = self.origins.write().unwrap(); let mut origins = self.origins.write().unwrap();
(*origins).insert(descr, origin); (*origins).insert(descr, sync::Arc::new(origin));
Ok(()) Ok(())
} }
fn get(&self, descr: origin::Descr) -> Result<Self::Origin<'_>, store::GetError> { fn get(&self, descr: origin::Descr) -> Result<sync::Arc<dyn origin::Origin>, store::GetError> {
{ {
let origins = self.origins.read().unwrap(); let origins = self.origins.read().unwrap();
if let Some(origin) = origins.get(&descr) { if let Some(origin) = origins.get(&descr) {
@ -288,16 +280,18 @@ impl super::Store for sync::Arc<Store> {
GetOriginError::Unexpected(e) => store::GetError::Unexpected(e), GetOriginError::Unexpected(e) => store::GetError::Unexpected(e),
})?; })?;
let origin = sync::Arc::new(origin.clone());
let mut origins = self.origins.write().unwrap(); let mut origins = self.origins.write().unwrap();
(*origins).insert(descr, origin.clone()); (*origins).insert(descr, origin.clone());
Ok(origin) Ok(origin)
} }
fn all_descrs(&self) -> store::AllDescrsResult<Self::AllDescrsIter<'_>> { fn all_descrs(&self) -> Result<Vec<origin::Descr>, store::AllDescrsError> {
Ok(Box::from(
fs::read_dir(&self.dir_path).or_unexpected()?.map( fs::read_dir(&self.dir_path).or_unexpected()?.map(
|dir_entry_res: io::Result<fs::DirEntry>| -> store::AllDescrsResult<origin::Descr> { |dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, store::AllDescrsError> {
let descr_id: String = dir_entry_res let descr_id: String = dir_entry_res
.or_unexpected()? .or_unexpected()?
.file_name() .file_name()
@ -323,8 +317,7 @@ impl super::Store for sync::Arc<Store> {
Ok(descr) Ok(descr)
}, },
), ).try_collect()
))
} }
} }

View File

@ -7,7 +7,6 @@ use std::net;
use std::str::FromStr; use std::str::FromStr;
use std::sync; use std::sync;
use crate::origin::Origin;
use crate::{domain, origin}; use crate::{domain, origin};
pub mod http_tpl; pub mod http_tpl;
@ -16,26 +15,20 @@ mod util;
type SvcResponse = Result<Response<hyper::body::Body>, String>; type SvcResponse = Result<Response<hyper::body::Body>, String>;
#[derive(Clone)] #[derive(Clone)]
pub struct Service<'svc, DomainManager> pub struct Service<'svc> {
where domain_manager: sync::Arc<dyn domain::manager::Manager>,
DomainManager: domain::manager::BoxedManager,
{
domain_manager: DomainManager,
target_a: net::Ipv4Addr, target_a: net::Ipv4Addr,
passphrase: String, passphrase: String,
http_domain: domain::Name, http_domain: domain::Name,
handlebars: handlebars::Handlebars<'svc>, handlebars: handlebars::Handlebars<'svc>,
} }
pub fn new<'svc, DomainManager>( pub fn new<'svc>(
domain_manager: DomainManager, domain_manager: sync::Arc<dyn domain::manager::Manager>,
target_a: net::Ipv4Addr, target_a: net::Ipv4Addr,
passphrase: String, passphrase: String,
http_domain: domain::Name, http_domain: domain::Name,
) -> Service<'svc, DomainManager> ) -> Service<'svc> {
where
DomainManager: domain::manager::BoxedManager,
{
Service { Service {
domain_manager, domain_manager,
target_a, target_a,
@ -67,10 +60,7 @@ struct DomainSyncArgs {
passphrase: String, passphrase: String,
} }
impl<'svc, DomainManager> Service<'svc, DomainManager> impl<'svc> Service<'svc> {
where
DomainManager: domain::manager::BoxedManager,
{
fn serve_string(&self, status_code: u16, path: &'_ str, body: Vec<u8>) -> SvcResponse { fn serve_string(&self, status_code: u16, path: &'_ str, body: Vec<u8>) -> SvcResponse {
let content_type = mime_guess::from_path(path) let content_type = mime_guess::from_path(path)
.first_or_octet_stream() .first_or_octet_stream()
@ -320,13 +310,10 @@ where
} }
} }
pub async fn handle_request<DomainManager>( pub async fn handle_request(
svc: sync::Arc<Service<'_, DomainManager>>, svc: sync::Arc<Service<'_>>,
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, Infallible> ) -> Result<Response<Body>, Infallible> {
where
DomainManager: domain::manager::BoxedManager,
{
match handle_request_inner(svc, req).await { match handle_request_inner(svc, req).await {
Ok(res) => Ok(res), Ok(res) => Ok(res),
Err(err) => panic!("unexpected error {err}"), Err(err) => panic!("unexpected error {err}"),
@ -340,13 +327,7 @@ fn strip_port(host: &str) -> &str {
} }
} }
pub async fn handle_request_inner<DomainManager>( pub async fn handle_request_inner(svc: sync::Arc<Service<'_>>, req: Request<Body>) -> SvcResponse {
svc: sync::Arc<Service<'_, DomainManager>>,
req: Request<Body>,
) -> SvcResponse
where
DomainManager: domain::manager::BoxedManager,
{
let maybe_host = match ( let maybe_host = match (
req.headers() req.headers()
.get("Host") .get("Host")