Compare commits
6 Commits
644d2bab23
...
9c36ae1c7b
Author | SHA1 | Date | |
---|---|---|---|
|
9c36ae1c7b | ||
|
7a35befffe | ||
|
fa85fe7fd8 | ||
|
bd96581c6a | ||
|
0b22801503 | ||
|
dd07bbf7ac |
@ -10,7 +10,7 @@ pub type GetHttp01ChallengeKeyError = acme::store::GetHttp01ChallengeKeyError;
|
||||
pub type GetCertificateError = acme::store::GetCertificateError;
|
||||
|
||||
#[mockall::automock]
|
||||
pub trait Manager: Sync + Send {
|
||||
pub trait Manager {
|
||||
fn sync_domain<'mgr>(
|
||||
&'mgr self,
|
||||
domain: domain::Name,
|
||||
@ -24,55 +24,62 @@ pub trait Manager: Sync + Send {
|
||||
) -> Result<(PrivateKey, Vec<Certificate>), GetCertificateError>;
|
||||
}
|
||||
|
||||
struct ManagerImpl {
|
||||
store: Box<dyn acme::store::Store>,
|
||||
pub struct ManagerImpl {
|
||||
store: Box<dyn acme::store::Store + Send + Sync>,
|
||||
account: sync::Arc<acme2::Account>,
|
||||
}
|
||||
|
||||
pub async fn new(
|
||||
store: Box<dyn acme::store::Store>,
|
||||
contact_email: &str,
|
||||
) -> Result<Box<dyn Manager>, unexpected::Error> {
|
||||
let dir = acme2::DirectoryBuilder::new(LETS_ENCRYPT_URL.to_string())
|
||||
.build()
|
||||
.await
|
||||
.or_unexpected_while("creating acme2 directory builder")?;
|
||||
impl ManagerImpl {
|
||||
pub async fn new<Store: acme::store::Store + Send + Sync + 'static>(
|
||||
store: Store,
|
||||
contact_email: &str,
|
||||
) -> Result<Self, unexpected::Error> {
|
||||
let dir = acme2::DirectoryBuilder::new(LETS_ENCRYPT_URL.to_string())
|
||||
.build()
|
||||
.await
|
||||
.or_unexpected_while("creating acme2 directory builder")?;
|
||||
|
||||
let mut contact = String::from("mailto:");
|
||||
contact.push_str(contact_email);
|
||||
let mut contact = String::from("mailto:");
|
||||
contact.push_str(contact_email);
|
||||
|
||||
let mut builder = acme2::AccountBuilder::new(dir);
|
||||
builder.contact(vec![contact]);
|
||||
builder.terms_of_service_agreed(true);
|
||||
let mut builder = acme2::AccountBuilder::new(dir);
|
||||
builder.contact(vec![contact]);
|
||||
builder.terms_of_service_agreed(true);
|
||||
|
||||
match store.get_account_key() {
|
||||
Ok(account_key) => {
|
||||
builder.private_key(
|
||||
(&account_key)
|
||||
.try_into()
|
||||
.or_unexpected_while("parsing private key")?,
|
||||
);
|
||||
match store.get_account_key() {
|
||||
Ok(account_key) => {
|
||||
builder.private_key(
|
||||
(&account_key)
|
||||
.try_into()
|
||||
.or_unexpected_while("parsing private key")?,
|
||||
);
|
||||
}
|
||||
Err(acme::store::GetAccountKeyError::NotFound) => (),
|
||||
Err(acme::store::GetAccountKeyError::Unexpected(err)) => {
|
||||
return Err(err.into_unexpected())
|
||||
}
|
||||
}
|
||||
Err(acme::store::GetAccountKeyError::NotFound) => (),
|
||||
Err(acme::store::GetAccountKeyError::Unexpected(err)) => return Err(err.into_unexpected()),
|
||||
|
||||
let account = builder
|
||||
.build()
|
||||
.await
|
||||
.or_unexpected_while("building account")?;
|
||||
|
||||
let account_key: acme::PrivateKey = account
|
||||
.private_key()
|
||||
.as_ref()
|
||||
.try_into()
|
||||
.or_unexpected_while("parsing private key back out")?;
|
||||
|
||||
store
|
||||
.set_account_key(&account_key)
|
||||
.or_unexpected_while("storing account key")?;
|
||||
|
||||
Ok(Self {
|
||||
store: Box::from(store),
|
||||
account,
|
||||
})
|
||||
}
|
||||
|
||||
let account = builder
|
||||
.build()
|
||||
.await
|
||||
.or_unexpected_while("building account")?;
|
||||
|
||||
let account_key: acme::PrivateKey = account
|
||||
.private_key()
|
||||
.as_ref()
|
||||
.try_into()
|
||||
.or_unexpected_while("parsing private key back out")?;
|
||||
|
||||
store
|
||||
.set_account_key(&account_key)
|
||||
.or_unexpected_while("storing account key")?;
|
||||
|
||||
Ok(Box::new(ManagerImpl { store, account }))
|
||||
}
|
||||
|
||||
impl Manager for ManagerImpl {
|
||||
|
@ -38,7 +38,7 @@ pub enum GetCertificateError {
|
||||
}
|
||||
|
||||
#[mockall::automock]
|
||||
pub trait Store: Sync + Send {
|
||||
pub trait Store {
|
||||
fn set_account_key(&self, k: &PrivateKey) -> Result<(), unexpected::Error>;
|
||||
fn get_account_key(&self) -> Result<PrivateKey, GetAccountKeyError>;
|
||||
|
||||
@ -66,28 +66,29 @@ struct StoredPKeyCert {
|
||||
cert: Vec<Certificate>,
|
||||
}
|
||||
|
||||
struct FSStore {
|
||||
pub struct FSStore {
|
||||
dir_path: path::PathBuf,
|
||||
}
|
||||
|
||||
pub fn new(dir_path: &path::Path) -> Result<Box<dyn Store>, unexpected::Error> {
|
||||
vec![
|
||||
dir_path,
|
||||
dir_path.join("http01_challenge_keys").as_ref(),
|
||||
dir_path.join("certificates").as_ref(),
|
||||
]
|
||||
.iter()
|
||||
.map(|dir| {
|
||||
fs::create_dir_all(dir).map_unexpected_while(|| format!("creating dir {}", dir.display()))
|
||||
})
|
||||
.try_collect()?;
|
||||
|
||||
Ok(Box::new(FSStore {
|
||||
dir_path: dir_path.into(),
|
||||
}))
|
||||
}
|
||||
|
||||
impl FSStore {
|
||||
pub fn new(dir_path: &path::Path) -> Result<Self, unexpected::Error> {
|
||||
vec![
|
||||
dir_path,
|
||||
dir_path.join("http01_challenge_keys").as_ref(),
|
||||
dir_path.join("certificates").as_ref(),
|
||||
]
|
||||
.iter()
|
||||
.map(|dir| {
|
||||
fs::create_dir_all(dir)
|
||||
.map_unexpected_while(|| format!("creating dir {}", dir.display()))
|
||||
})
|
||||
.try_collect()?;
|
||||
|
||||
Ok(Self {
|
||||
dir_path: dir_path.into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn account_key_path(&self) -> path::PathBuf {
|
||||
self.dir_path.join("account.key")
|
||||
}
|
||||
@ -232,7 +233,7 @@ mod tests {
|
||||
#[test]
|
||||
fn account_key() {
|
||||
let tmp_dir = TempDir::new("domain_acme_store_account_key").unwrap();
|
||||
let store = new(tmp_dir.path()).expect("store created");
|
||||
let store = FSStore::new(tmp_dir.path()).expect("store created");
|
||||
|
||||
assert!(matches!(
|
||||
store.get_account_key(),
|
||||
@ -254,7 +255,7 @@ mod tests {
|
||||
#[test]
|
||||
fn http01_challenge_key() {
|
||||
let tmp_dir = TempDir::new("domain_acme_store_http01_challenge_key").unwrap();
|
||||
let store = new(tmp_dir.path()).expect("store created");
|
||||
let store = FSStore::new(tmp_dir.path()).expect("store created");
|
||||
|
||||
let token = "foo".to_string();
|
||||
let key = "bar".to_string();
|
||||
|
@ -36,27 +36,27 @@ pub struct DNSChecker {
|
||||
client: tokio::sync::Mutex<AsyncClient>,
|
||||
}
|
||||
|
||||
pub async fn new(
|
||||
target_a: net::Ipv4Addr,
|
||||
resolver_addr: &str,
|
||||
) -> Result<DNSChecker, NewDNSCheckerError> {
|
||||
let resolver_addr = resolver_addr
|
||||
.parse()
|
||||
.map_err(|_| NewDNSCheckerError::InvalidResolverAddress)?;
|
||||
|
||||
let stream = udp::UdpClientStream::<tokio::net::UdpSocket>::new(resolver_addr);
|
||||
|
||||
let (client, bg) = AsyncClient::connect(stream).await.or_unexpected()?;
|
||||
|
||||
tokio::spawn(bg);
|
||||
|
||||
Ok(DNSChecker {
|
||||
target_a,
|
||||
client: tokio::sync::Mutex::new(client),
|
||||
})
|
||||
}
|
||||
|
||||
impl DNSChecker {
|
||||
pub async fn new(
|
||||
target_a: net::Ipv4Addr,
|
||||
resolver_addr: &str,
|
||||
) -> Result<Self, NewDNSCheckerError> {
|
||||
let resolver_addr = resolver_addr
|
||||
.parse()
|
||||
.map_err(|_| NewDNSCheckerError::InvalidResolverAddress)?;
|
||||
|
||||
let stream = udp::UdpClientStream::<tokio::net::UdpSocket>::new(resolver_addr);
|
||||
|
||||
let (client, bg) = AsyncClient::connect(stream).await.or_unexpected()?;
|
||||
|
||||
tokio::spawn(bg);
|
||||
|
||||
Ok(Self {
|
||||
target_a,
|
||||
client: tokio::sync::Mutex::new(client),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn check_domain(
|
||||
&self,
|
||||
domain: &domain::Name,
|
||||
|
@ -39,24 +39,24 @@ pub enum SetError {
|
||||
}
|
||||
|
||||
#[mockall::automock]
|
||||
pub trait Store: Sync + Send {
|
||||
pub trait Store {
|
||||
fn get(&self, domain: &domain::Name) -> Result<Config, GetError>;
|
||||
fn set(&self, domain: &domain::Name, config: &Config) -> Result<(), SetError>;
|
||||
fn all_domains(&self) -> Result<Vec<domain::Name>, unexpected::Error>;
|
||||
}
|
||||
|
||||
struct FSStore {
|
||||
pub struct FSStore {
|
||||
dir_path: PathBuf,
|
||||
}
|
||||
|
||||
pub fn new(dir_path: &Path) -> io::Result<Box<dyn Store>> {
|
||||
fs::create_dir_all(dir_path)?;
|
||||
Ok(Box::new(FSStore {
|
||||
dir_path: dir_path.into(),
|
||||
}))
|
||||
}
|
||||
|
||||
impl FSStore {
|
||||
pub fn new(dir_path: &Path) -> io::Result<Self> {
|
||||
fs::create_dir_all(dir_path)?;
|
||||
Ok(Self {
|
||||
dir_path: dir_path.into(),
|
||||
})
|
||||
}
|
||||
|
||||
fn config_dir_path(&self, domain: &domain::Name) -> PathBuf {
|
||||
self.dir_path.join(domain.as_str())
|
||||
}
|
||||
@ -116,7 +116,7 @@ impl Store for FSStore {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use super::{Store, *};
|
||||
use crate::domain;
|
||||
use crate::origin::Descr;
|
||||
|
||||
@ -128,7 +128,7 @@ mod tests {
|
||||
fn basic() {
|
||||
let tmp_dir = TempDir::new("domain_config_store").unwrap();
|
||||
|
||||
let store = new(tmp_dir.path()).expect("store created");
|
||||
let store = FSStore::new(tmp_dir.path()).expect("store created");
|
||||
|
||||
let domain = domain::Name::from_str("foo.com").expect("domain parsed");
|
||||
|
||||
|
@ -145,59 +145,66 @@ pub trait Manager: Sync + Send + rustls::server::ResolvesServerCert {
|
||||
fn all_domains(&self) -> Result<Vec<domain::Name>, unexpected::Error>;
|
||||
}
|
||||
|
||||
struct ManagerImpl {
|
||||
origin_store: Box<dyn origin::store::Store>,
|
||||
domain_config_store: Box<dyn config::Store>,
|
||||
pub struct ManagerImpl {
|
||||
origin_store: Box<dyn origin::store::Store + Send + Sync>,
|
||||
domain_config_store: Box<dyn config::Store + Send + Sync>,
|
||||
domain_checker: checker::DNSChecker,
|
||||
acme_manager: Option<Box<dyn acme::manager::Manager>>,
|
||||
acme_manager: Option<Box<dyn acme::manager::Manager + Send + Sync>>,
|
||||
}
|
||||
|
||||
async fn sync_origins(origin_store: &dyn origin::store::Store, canceller: CancellationToken) {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
match origin_store.all_descrs() {
|
||||
Ok(iter) => iter.into_iter(),
|
||||
Err(err) => {
|
||||
log::error!("Error fetching origin descriptors: {err}");
|
||||
return;
|
||||
impl ManagerImpl {
|
||||
pub fn new<
|
||||
OriginStore: origin::store::Store + Send + Sync + 'static,
|
||||
DomainConfigStore: config::Store + Send + Sync + 'static,
|
||||
AcmeManager: acme::manager::Manager + Send + Sync + 'static,
|
||||
>(
|
||||
task_stack: &mut util::TaskStack<unexpected::Error>,
|
||||
origin_store: OriginStore,
|
||||
domain_config_store: DomainConfigStore,
|
||||
domain_checker: checker::DNSChecker,
|
||||
acme_manager: Option<AcmeManager>,
|
||||
) -> sync::Arc<dyn Manager> {
|
||||
let manager = sync::Arc::new(ManagerImpl {
|
||||
origin_store: Box::from(origin_store),
|
||||
domain_config_store: Box::from(domain_config_store),
|
||||
domain_checker: domain_checker,
|
||||
acme_manager: acme_manager
|
||||
.map(|m| Box::new(m) as Box<dyn acme::manager::Manager + Send + Sync>),
|
||||
});
|
||||
|
||||
task_stack.push_spawn(|canceller| {
|
||||
let manager = manager.clone();
|
||||
async move { Ok(manager.sync_origins(canceller).await) }
|
||||
});
|
||||
|
||||
manager
|
||||
}
|
||||
|
||||
async fn sync_origins(&self, canceller: CancellationToken) {
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
match self.origin_store.all_descrs() {
|
||||
Ok(iter) => iter.into_iter(),
|
||||
Err(err) => {
|
||||
log::error!("Error fetching origin descriptors: {err}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
.for_each(|descr| {
|
||||
if let Err(err) = self.origin_store.sync(descr.clone(), origin::store::Limits {}) {
|
||||
log::error!("Failed to sync store for {:?}: {err}", descr);
|
||||
return;
|
||||
}
|
||||
});
|
||||
},
|
||||
_ = canceller.cancelled() => return,
|
||||
}
|
||||
.for_each(|descr| {
|
||||
if let Err(err) = origin_store.sync(descr.clone(), origin::store::Limits {}) {
|
||||
log::error!("Failed to sync store for {:?}: {err}", descr);
|
||||
return;
|
||||
}
|
||||
});
|
||||
},
|
||||
_ = canceller.cancelled() => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
task_stack: &mut util::TaskStack<unexpected::Error>,
|
||||
origin_store: Box<dyn origin::store::Store>,
|
||||
domain_config_store: Box<dyn config::Store>,
|
||||
domain_checker: checker::DNSChecker,
|
||||
acme_manager: Option<Box<dyn acme::manager::Manager>>,
|
||||
) -> sync::Arc<dyn Manager> {
|
||||
let manager = sync::Arc::new(ManagerImpl {
|
||||
origin_store,
|
||||
domain_config_store,
|
||||
domain_checker,
|
||||
acme_manager,
|
||||
});
|
||||
|
||||
task_stack.push_spawn(|canceller| {
|
||||
let manager = manager.clone();
|
||||
async move { Ok(sync_origins(manager.origin_store.as_ref(), canceller).await) }
|
||||
});
|
||||
|
||||
manager
|
||||
}
|
||||
|
||||
impl Manager for ManagerImpl {
|
||||
fn get_config(&self, domain: &domain::Name) -> Result<config::Config, GetConfigError> {
|
||||
Ok(self.domain_config_store.get(domain)?)
|
||||
|
@ -1,7 +1,7 @@
|
||||
use std::fmt::Write;
|
||||
use std::{error, fmt};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
/// Error is a String which implements the Error trait. It is intended to be used in
|
||||
/// situations where the caller is being given an error they can't really handle, except to pass it
|
||||
/// along or log it.
|
||||
|
@ -1,5 +1,6 @@
|
||||
#![feature(result_option_inspect)]
|
||||
#![feature(iterator_try_collect)]
|
||||
#![feature(iter_collect_into)]
|
||||
|
||||
pub mod domain;
|
||||
pub mod error;
|
||||
|
25
src/main.rs
25
src/main.rs
@ -73,33 +73,38 @@ async fn main() {
|
||||
)
|
||||
.init();
|
||||
|
||||
let origin_store = domani::origin::store::git::new(config.origin_store_git_dir_path)
|
||||
let origin_store = domani::origin::store::git::FSStore::new(config.origin_store_git_dir_path)
|
||||
.expect("git origin store initialization failed");
|
||||
|
||||
let domain_checker = domani::domain::checker::new(
|
||||
let domain_checker = domani::domain::checker::DNSChecker::new(
|
||||
config.domain_checker_target_a,
|
||||
&config.domain_checker_resolver_addr,
|
||||
)
|
||||
.await
|
||||
.expect("domain checker initialization failed");
|
||||
|
||||
let domain_config_store = domani::domain::config::new(&config.domain_config_store_dir_path)
|
||||
.expect("domain config store initialization failed");
|
||||
let domain_config_store =
|
||||
domani::domain::config::FSStore::new(&config.domain_config_store_dir_path)
|
||||
.expect("domain config store initialization failed");
|
||||
|
||||
let domain_acme_manager = if config.https_listen_addr.is_some() {
|
||||
let domain_acme_store_dir_path = config.domain_acme_store_dir_path.unwrap();
|
||||
|
||||
let domain_acme_store = domani::domain::acme::store::new(&domain_acme_store_dir_path)
|
||||
.expect("domain acme store initialization failed");
|
||||
let domain_acme_store =
|
||||
domani::domain::acme::store::FSStore::new(&domain_acme_store_dir_path)
|
||||
.expect("domain acme store initialization failed");
|
||||
|
||||
// if https_listen_addr is set then domain_acme_contact_email is required, see the Cli/clap
|
||||
// settings.
|
||||
let domain_acme_contact_email = config.domain_acme_contact_email.unwrap();
|
||||
|
||||
Some(
|
||||
domani::domain::acme::manager::new(domain_acme_store, &domain_acme_contact_email)
|
||||
.await
|
||||
.expect("domain acme manager initialization failed"),
|
||||
domani::domain::acme::manager::ManagerImpl::new(
|
||||
domain_acme_store,
|
||||
&domain_acme_contact_email,
|
||||
)
|
||||
.await
|
||||
.expect("domain acme manager initialization failed"),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
@ -107,7 +112,7 @@ async fn main() {
|
||||
|
||||
let mut task_stack = domani::util::TaskStack::new();
|
||||
|
||||
let domain_manager = domani::domain::manager::new(
|
||||
let domain_manager = domani::domain::manager::ManagerImpl::new(
|
||||
&mut task_stack,
|
||||
origin_store,
|
||||
domain_config_store,
|
||||
|
@ -3,13 +3,14 @@ use crate::origin;
|
||||
use std::sync;
|
||||
|
||||
pub mod git;
|
||||
pub mod mux;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Limits {
|
||||
// TODO storage limits
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
||||
pub enum SyncError {
|
||||
#[error("invalid url")]
|
||||
InvalidURL,
|
||||
@ -24,7 +25,7 @@ pub enum SyncError {
|
||||
Unexpected(#[from] unexpected::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
||||
pub enum GetError {
|
||||
#[error("not found")]
|
||||
NotFound,
|
||||
@ -33,7 +34,7 @@ pub enum GetError {
|
||||
Unexpected(#[from] unexpected::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
||||
pub enum AllDescrsError {
|
||||
#[error(transparent)]
|
||||
Unexpected(#[from] unexpected::Error),
|
||||
@ -41,7 +42,7 @@ pub enum AllDescrsError {
|
||||
|
||||
#[mockall::automock]
|
||||
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
|
||||
pub trait Store: Sync + Send {
|
||||
pub trait Store {
|
||||
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
|
||||
/// the origin into the storage.
|
||||
fn sync(&self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError>;
|
||||
@ -49,3 +50,21 @@ pub trait Store: Sync + Send {
|
||||
fn get(&self, descr: origin::Descr) -> Result<sync::Arc<dyn origin::Origin>, GetError>;
|
||||
fn all_descrs(&self) -> Result<Vec<origin::Descr>, AllDescrsError>;
|
||||
}
|
||||
|
||||
pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
|
||||
sync::Arc::new(sync::Mutex::new(MockStore::new()))
|
||||
}
|
||||
|
||||
impl Store for sync::Arc<sync::Mutex<MockStore>> {
|
||||
fn sync(&self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError> {
|
||||
self.lock().unwrap().sync(descr, limits)
|
||||
}
|
||||
|
||||
fn get(&self, descr: origin::Descr) -> Result<sync::Arc<dyn origin::Origin>, GetError> {
|
||||
self.lock().unwrap().get(descr)
|
||||
}
|
||||
|
||||
fn all_descrs(&self) -> Result<Vec<origin::Descr>, AllDescrsError> {
|
||||
self.lock().unwrap().all_descrs()
|
||||
}
|
||||
}
|
||||
|
@ -59,9 +59,9 @@ enum GetOriginError {
|
||||
Unexpected(#[from] unexpected::Error),
|
||||
}
|
||||
|
||||
/// git::Store implements the Store trait for any Descr::Git based Origins. If any non-git
|
||||
/// Descrs are used then this implementation will panic.
|
||||
struct Store {
|
||||
/// Implements the Store trait for any Descr::Git based Origins, storing the git repos on disk. If
|
||||
/// any non-git Descrs are used then this implementation will panic.
|
||||
pub struct FSStore {
|
||||
dir_path: PathBuf,
|
||||
|
||||
// to prevent against syncing the same origin more than once at a time, but still allowing
|
||||
@ -71,16 +71,16 @@ struct Store {
|
||||
origins: sync::RwLock<collections::HashMap<origin::Descr, sync::Arc<Origin>>>,
|
||||
}
|
||||
|
||||
pub fn new(dir_path: PathBuf) -> io::Result<Box<dyn super::Store>> {
|
||||
fs::create_dir_all(&dir_path)?;
|
||||
Ok(Box::new(Store {
|
||||
dir_path,
|
||||
sync_guard: sync::Mutex::new(collections::HashMap::new()),
|
||||
origins: sync::RwLock::new(collections::HashMap::new()),
|
||||
}))
|
||||
}
|
||||
impl FSStore {
|
||||
pub fn new(dir_path: PathBuf) -> io::Result<Self> {
|
||||
fs::create_dir_all(&dir_path)?;
|
||||
Ok(Self {
|
||||
dir_path,
|
||||
sync_guard: sync::Mutex::new(collections::HashMap::new()),
|
||||
origins: sync::RwLock::new(collections::HashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
impl Store {
|
||||
fn repo_path(&self, descr: &origin::Descr) -> PathBuf {
|
||||
self.dir_path.join(descr.id())
|
||||
}
|
||||
@ -208,7 +208,7 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Store for Store {
|
||||
impl super::Store for FSStore {
|
||||
fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> {
|
||||
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
||||
// isn't actually being held for the whole method duration.
|
||||
@ -323,9 +323,7 @@ impl super::Store for Store {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::origin::store;
|
||||
use crate::origin::store::Store;
|
||||
use crate::origin::{self, Origin};
|
||||
use crate::origin::{self, store, store::Store};
|
||||
use tempdir::TempDir;
|
||||
|
||||
#[test]
|
||||
@ -346,7 +344,7 @@ mod tests {
|
||||
|
||||
let limits = store::Limits {};
|
||||
|
||||
let store = super::new(tmp_dir.path().to_path_buf()).expect("store created");
|
||||
let store = super::FSStore::new(tmp_dir.path().to_path_buf()).expect("store created");
|
||||
|
||||
store
|
||||
.sync(descr.clone(), limits)
|
||||
@ -383,13 +381,9 @@ mod tests {
|
||||
));
|
||||
assert_eq!(into.len(), 0);
|
||||
|
||||
let descrs = store
|
||||
.all_descrs()
|
||||
.expect("all_descrs called")
|
||||
.into_iter()
|
||||
.collect::<Vec<Result<origin::Descr, store::AllDescrsError>>>();
|
||||
let descrs = store.all_descrs().expect("all_descrs called");
|
||||
|
||||
assert_eq!(1, descrs.len());
|
||||
assert_eq!(&descr, descrs[0].as_ref().unwrap());
|
||||
assert_eq!(descr, descrs[0]);
|
||||
}
|
||||
}
|
||||
|
168
src/origin/store/mux.rs
Normal file
168
src/origin/store/mux.rs
Normal file
@ -0,0 +1,168 @@
|
||||
use crate::error::unexpected::Mappable;
|
||||
use crate::origin::{self, store};
|
||||
use std::sync;
|
||||
|
||||
pub struct Store<F, S>
|
||||
where
|
||||
S: store::Store + 'static,
|
||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||
{
|
||||
mapping_fn: F,
|
||||
stores: Vec<S>,
|
||||
}
|
||||
|
||||
impl<F, S> Store<F, S>
|
||||
where
|
||||
S: store::Store + 'static,
|
||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||
{
|
||||
pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
|
||||
Store { mapping_fn, stores }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, S> store::Store for Store<F, S>
|
||||
where
|
||||
S: store::Store + 'static,
|
||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||
{
|
||||
fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> {
|
||||
(self.mapping_fn)(&descr)
|
||||
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
||||
.sync(descr, limits)
|
||||
}
|
||||
|
||||
fn get(&self, descr: origin::Descr) -> Result<sync::Arc<dyn origin::Origin>, store::GetError> {
|
||||
(self.mapping_fn)(&descr)
|
||||
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
||||
.get(descr)
|
||||
}
|
||||
|
||||
fn all_descrs(&self) -> Result<Vec<origin::Descr>, store::AllDescrsError> {
|
||||
let mut res = Vec::<origin::Descr>::new();
|
||||
|
||||
for store in self.stores.iter() {
|
||||
store.all_descrs()?.into_iter().collect_into(&mut res);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::origin::{self, store};
|
||||
use mockall::predicate;
|
||||
use std::sync;
|
||||
|
||||
struct Harness {
|
||||
descr_a: origin::Descr,
|
||||
descr_b: origin::Descr,
|
||||
descr_unknown: origin::Descr,
|
||||
store_a: sync::Arc<sync::Mutex<store::MockStore>>,
|
||||
store_b: sync::Arc<sync::Mutex<store::MockStore>>,
|
||||
store: Box<dyn store::Store>,
|
||||
}
|
||||
|
||||
impl Harness {
|
||||
fn new() -> Harness {
|
||||
let descr_a = origin::Descr::Git {
|
||||
url: "A".to_string(),
|
||||
branch_name: "A".to_string(),
|
||||
};
|
||||
|
||||
let descr_b = origin::Descr::Git {
|
||||
url: "B".to_string(),
|
||||
branch_name: "B".to_string(),
|
||||
};
|
||||
|
||||
let store_a = store::new_mock();
|
||||
let store_b = store::new_mock();
|
||||
|
||||
Harness {
|
||||
descr_a: descr_a.clone(),
|
||||
descr_b: descr_b.clone(),
|
||||
descr_unknown: origin::Descr::Git {
|
||||
url: "X".to_string(),
|
||||
branch_name: "X".to_string(),
|
||||
},
|
||||
store_a: store_a.clone(),
|
||||
store_b: store_b.clone(),
|
||||
store: Box::from(super::Store::new(
|
||||
{
|
||||
let store_a = store_a.clone();
|
||||
let store_b = store_b.clone();
|
||||
move |descr| match descr {
|
||||
&origin::Descr::Git { ref url, .. } if url == "A" => {
|
||||
Some(store_a.clone())
|
||||
}
|
||||
&origin::Descr::Git { ref url, .. } if url == "B" => {
|
||||
Some(store_b.clone())
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
},
|
||||
vec![store_a.clone(), store_b.clone()],
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sync() {
|
||||
let h = Harness::new();
|
||||
|
||||
h.store_a
|
||||
.lock()
|
||||
.unwrap()
|
||||
.expect_sync()
|
||||
.with(predicate::eq(h.descr_a.clone()), predicate::always())
|
||||
.times(1)
|
||||
.return_const(Ok::<(), store::SyncError>(()));
|
||||
|
||||
assert_eq!(Ok(()), h.store.sync(h.descr_a.clone(), store::Limits {}));
|
||||
|
||||
h.store_b
|
||||
.lock()
|
||||
.unwrap()
|
||||
.expect_sync()
|
||||
.with(predicate::eq(h.descr_b.clone()), predicate::always())
|
||||
.times(1)
|
||||
.return_const(Ok::<(), store::SyncError>(()));
|
||||
|
||||
assert_eq!(Ok(()), h.store.sync(h.descr_b.clone(), store::Limits {}));
|
||||
|
||||
assert!(h
|
||||
.store
|
||||
.sync(h.descr_unknown.clone(), store::Limits {})
|
||||
.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn all_descrs() {
|
||||
let h = Harness::new();
|
||||
|
||||
h.store_a
|
||||
.lock()
|
||||
.unwrap()
|
||||
.expect_all_descrs()
|
||||
.times(1)
|
||||
.return_const(Ok::<Vec<origin::Descr>, store::AllDescrsError>(vec![h
|
||||
.descr_a
|
||||
.clone()]));
|
||||
|
||||
h.store_b
|
||||
.lock()
|
||||
.unwrap()
|
||||
.expect_all_descrs()
|
||||
.times(1)
|
||||
.return_const(Ok::<Vec<origin::Descr>, store::AllDescrsError>(vec![h
|
||||
.descr_b
|
||||
.clone()]));
|
||||
|
||||
assert_eq!(
|
||||
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
|
||||
h.store.all_descrs(),
|
||||
)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user