diff --git a/src/domain/manager.rs b/src/domain/manager.rs index d7b9ff1..65948cd 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -206,13 +206,13 @@ impl ManagerImpl { manager } - fn sync_domain_origin( + async fn sync_domain_origin( &self, domain: &domain::Name, origin_descr: &origin::Descr, ) -> Result<(), origin::SyncError> { log::info!("Syncing origin {:?} for domain {domain}", origin_descr,); - self.origin_store.sync(origin_descr) + self.origin_store.sync(origin_descr).await } async fn sync_origins(&self) -> unexpected::Result<()> { @@ -232,6 +232,7 @@ impl ManagerImpl { }; self.sync_domain_origin(&domain, &settings.origin_descr) + .await .map_unexpected_while(|| { format!( "syncing origin {:?} for domain {domain}", @@ -423,7 +424,8 @@ impl Manager for ManagerImpl { self.domain_checker.check_domain(&domain, &hash).await?; - self.sync_domain_origin(&domain, &settings.origin_descr)?; + self.sync_domain_origin(&domain, &settings.origin_descr) + .await?; if self.can_sync_gemini_cert() { self.sync_domain_gemini_cert(&domain) diff --git a/src/origin.rs b/src/origin.rs index 83b3dc8..b37fa43 100644 --- a/src/origin.rs +++ b/src/origin.rs @@ -8,7 +8,6 @@ pub use descr::Descr; use crate::error::unexpected; use crate::util; -use std::sync; #[derive(thiserror::Error, Clone, Debug, PartialEq)] pub enum SyncError { @@ -46,32 +45,13 @@ pub enum GetFileError { Unexpected(#[from] unexpected::Error), } -#[mockall::automock] /// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr. pub trait Store { /// If the origin is of a kind which can be updated, sync will pull down the latest version of /// the origin into the storage. - fn sync(&self, descr: &Descr) -> Result<(), SyncError>; + fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>; fn all_descrs(&self) -> Result, AllDescrsError>; fn get_file(&self, descr: &Descr, path: &str) -> Result; } - -pub fn new_mock() -> sync::Arc> { - sync::Arc::new(sync::Mutex::new(MockStore::new())) -} - -impl Store for sync::Arc> { - fn sync(&self, descr: &Descr) -> Result<(), SyncError> { - self.lock().unwrap().sync(descr) - } - - fn all_descrs(&self) -> Result, AllDescrsError> { - self.lock().unwrap().all_descrs() - } - - fn get_file(&self, descr: &Descr, path: &str) -> Result { - self.lock().unwrap().get_file(descr, path) - } -} diff --git a/src/origin/git.rs b/src/origin/git.rs index 397b5d9..ba3f9a4 100644 --- a/src/origin/git.rs +++ b/src/origin/git.rs @@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable}; use crate::{origin, util}; use std::path::{Path, PathBuf}; -use std::{collections, fs, io, sync}; +use std::{collections, fs, future, io, sync}; use futures::stream; @@ -216,50 +216,62 @@ impl FSStore { } impl super::Store for FSStore { - fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> { - // attempt to lock this descr for syncing, doing so within a new scope so the mutex - // isn't actually being held for the whole method duration. - let is_already_syncing = { - self.sync_guard - .lock() - .unwrap() - .insert(descr.clone(), ()) - .is_some() - }; + fn sync( + &self, + descr: &origin::Descr, + ) -> util::BoxFuture<'static, Result<(), origin::SyncError>> { + // TODO this implementation is kind of cheating, as it's doing everything synchronously but + // then returning the result in an async box. But the git store is going to be + // re-implemented soon anyway, so it doesn't matter. + let res = (|| { + // attempt to lock this descr for syncing, doing so within a new scope so the mutex + // isn't actually being held for the whole method duration. + let is_already_syncing = { + self.sync_guard + .lock() + .unwrap() + .insert(descr.clone(), ()) + .is_some() + }; - if is_already_syncing { - return Err(origin::SyncError::AlreadyInProgress); - } + if is_already_syncing { + return Err(origin::SyncError::AlreadyInProgress); + } - let res = self.sync_inner(descr); + let res = self.sync_inner(&descr); - self.sync_guard.lock().unwrap().remove(descr); + self.sync_guard.lock().unwrap().remove(&descr); - let repo = match res { - Ok(repo) => repo, - Err(e) => return Err(e), - }; + let repo = match res { + Ok(repo) => repo, + Err(e) => return Err(e), + }; - // repo is synced at this point (though the sync lock is still held), just gotta create - // the RepoSnapshot and store it. - // - // TODO this is a bit of a memory leak, but by the time we get - // to that point this should all be backed by something which isn't local storage - // anyway. + // repo is synced at this point (though the sync lock is still held), just gotta create + // the RepoSnapshot and store it. + // + // TODO this is a bit of a memory leak, but by the time we get + // to that point this should all be backed by something which isn't local storage + // anyway. - // calling this while the sync lock is held isn't ideal, but it's convenient and - // shouldn't be too terrible generally - let repo_snapshot = self - .create_repo_snapshot(repo, descr) - .map_err(|e| match e { - CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName, - CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), - })?; + // calling this while the sync lock is held isn't ideal, but it's convenient and + // shouldn't be too terrible generally + let repo_snapshot = self + .create_repo_snapshot(repo, &descr) + .map_err(|e| match e { + CreateRepoSnapshotError::InvalidBranchName => { + origin::SyncError::InvalidBranchName + } + CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), + })?; - let mut repo_snapshots = self.repo_snapshots.write().unwrap(); - (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); + let mut repo_snapshots = self.repo_snapshots.write().unwrap(); + (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); - Ok(()) + Ok(()) + })(); + + Box::pin(future::ready(res)) } fn all_descrs(&self) -> Result, origin::AllDescrsError> { @@ -374,8 +386,11 @@ mod tests { let store = super::FSStore::new(&config).expect("store created"); - store.sync(&descr).expect("sync should succeed"); - store.sync(&descr).expect("second sync should succeed"); + store.sync(&descr).await.expect("sync should succeed"); + store + .sync(&descr) + .await + .expect("second sync should succeed"); // RepoSnapshot doesn't exist match store.get_file(&other_descr, "DNE") { diff --git a/src/origin/mux.rs b/src/origin/mux.rs index fe3a2cf..b5ff449 100644 --- a/src/origin/mux.rs +++ b/src/origin/mux.rs @@ -3,7 +3,7 @@ use crate::{origin, util}; pub struct Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { mapping_fn: F, @@ -12,7 +12,7 @@ where impl Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { pub fn new(mapping_fn: F, stores: Vec) -> Store { @@ -22,13 +22,17 @@ where impl origin::Store for Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { - fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> { - (self.mapping_fn)(descr) - .or_unexpected_while(format!("mapping {:?} to store", &descr))? - .sync(descr) + fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { + let descr = descr.clone(); + Box::pin(async move { + (self.mapping_fn)(&descr) + .or_unexpected_while(format!("mapping {:?} to store", &descr))? + .sync(&descr) + .await + }) } fn all_descrs(&self) -> Result, origin::AllDescrsError> { @@ -51,123 +55,3 @@ where .get_file(descr, path) } } - -#[cfg(test)] -mod tests { - use crate::origin; - use std::sync; - - struct Harness { - descr_a: origin::Descr, - descr_b: origin::Descr, - descr_unknown: origin::Descr, - store_a: sync::Arc>, - store_b: sync::Arc>, - store: Box, - } - - impl Harness { - fn new() -> Harness { - let descr_a = origin::Descr::Git { - url: "A".to_string(), - branch_name: "A".to_string(), - }; - - let descr_b = origin::Descr::Git { - url: "B".to_string(), - branch_name: "B".to_string(), - }; - - let store_a = origin::new_mock(); - let store_b = origin::new_mock(); - - Harness { - descr_a: descr_a.clone(), - descr_b: descr_b.clone(), - descr_unknown: origin::Descr::Git { - url: "X".to_string(), - branch_name: "X".to_string(), - }, - store_a: store_a.clone(), - store_b: store_b.clone(), - store: Box::from(super::Store::new( - { - let store_a = store_a.clone(); - let store_b = store_b.clone(); - move |descr| match descr { - &origin::Descr::Git { ref url, .. } if url == "A" => { - Some(store_a.clone()) - } - &origin::Descr::Git { ref url, .. } if url == "B" => { - Some(store_b.clone()) - } - _ => None, - } - }, - vec![store_a.clone(), store_b.clone()], - )), - } - } - } - - #[test] - fn sync() { - let h = Harness::new(); - - { - let descr_a = h.descr_a.clone(); - h.store_a - .lock() - .unwrap() - .expect_sync() - .withf(move |descr: &origin::Descr| descr == &descr_a) - .times(1) - .return_const(Ok::<(), origin::SyncError>(())); - } - - assert_eq!(Ok(()), h.store.sync(&h.descr_a)); - - { - let descr_b = h.descr_b.clone(); - h.store_b - .lock() - .unwrap() - .expect_sync() - .withf(move |descr: &origin::Descr| descr == &descr_b) - .times(1) - .return_const(Ok::<(), origin::SyncError>(())); - } - - assert_eq!(Ok(()), h.store.sync(&h.descr_b)); - - assert!(h.store.sync(&h.descr_unknown).is_err()); - } - - #[test] - fn all_descrs() { - let h = Harness::new(); - - h.store_a - .lock() - .unwrap() - .expect_all_descrs() - .times(1) - .return_const(Ok::, origin::AllDescrsError>(vec![h - .descr_a - .clone()])); - - h.store_b - .lock() - .unwrap() - .expect_all_descrs() - .times(1) - .return_const(Ok::, origin::AllDescrsError>(vec![h - .descr_b - .clone()])); - - assert_eq!( - Ok(vec![h.descr_a.clone(), h.descr_b.clone()]), - h.store.all_descrs(), - ) - } -}