From 78b67a02ad87ad3e0e327117a55e4d27a1cc3e78 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Sun, 21 Jan 2024 16:18:31 +0100 Subject: [PATCH] Make origin::Store::sync async In order for this method to make sense as an async, the returned box must have its lifetime tied to the store. This did not jive well with the mock implementation of origin::Store, so I ditched that, along with the mux tests which depended on it. Oh well. --- src/domain/manager.rs | 8 ++- src/origin.rs | 22 +------ src/origin/git.rs | 93 ++++++++++++++++------------ src/origin/mux.rs | 138 ++++-------------------------------------- 4 files changed, 71 insertions(+), 190 deletions(-) diff --git a/src/domain/manager.rs b/src/domain/manager.rs index d7b9ff1..65948cd 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -206,13 +206,13 @@ impl ManagerImpl { manager } - fn sync_domain_origin( + async fn sync_domain_origin( &self, domain: &domain::Name, origin_descr: &origin::Descr, ) -> Result<(), origin::SyncError> { log::info!("Syncing origin {:?} for domain {domain}", origin_descr,); - self.origin_store.sync(origin_descr) + self.origin_store.sync(origin_descr).await } async fn sync_origins(&self) -> unexpected::Result<()> { @@ -232,6 +232,7 @@ impl ManagerImpl { }; self.sync_domain_origin(&domain, &settings.origin_descr) + .await .map_unexpected_while(|| { format!( "syncing origin {:?} for domain {domain}", @@ -423,7 +424,8 @@ impl Manager for ManagerImpl { self.domain_checker.check_domain(&domain, &hash).await?; - self.sync_domain_origin(&domain, &settings.origin_descr)?; + self.sync_domain_origin(&domain, &settings.origin_descr) + .await?; if self.can_sync_gemini_cert() { self.sync_domain_gemini_cert(&domain) diff --git a/src/origin.rs b/src/origin.rs index 83b3dc8..b37fa43 100644 --- a/src/origin.rs +++ b/src/origin.rs @@ -8,7 +8,6 @@ pub use descr::Descr; use crate::error::unexpected; use crate::util; -use std::sync; #[derive(thiserror::Error, Clone, Debug, PartialEq)] pub enum SyncError { @@ -46,32 +45,13 @@ pub enum GetFileError { Unexpected(#[from] unexpected::Error), } -#[mockall::automock] /// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr. pub trait Store { /// If the origin is of a kind which can be updated, sync will pull down the latest version of /// the origin into the storage. - fn sync(&self, descr: &Descr) -> Result<(), SyncError>; + fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>; fn all_descrs(&self) -> Result, AllDescrsError>; fn get_file(&self, descr: &Descr, path: &str) -> Result; } - -pub fn new_mock() -> sync::Arc> { - sync::Arc::new(sync::Mutex::new(MockStore::new())) -} - -impl Store for sync::Arc> { - fn sync(&self, descr: &Descr) -> Result<(), SyncError> { - self.lock().unwrap().sync(descr) - } - - fn all_descrs(&self) -> Result, AllDescrsError> { - self.lock().unwrap().all_descrs() - } - - fn get_file(&self, descr: &Descr, path: &str) -> Result { - self.lock().unwrap().get_file(descr, path) - } -} diff --git a/src/origin/git.rs b/src/origin/git.rs index 397b5d9..ba3f9a4 100644 --- a/src/origin/git.rs +++ b/src/origin/git.rs @@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable}; use crate::{origin, util}; use std::path::{Path, PathBuf}; -use std::{collections, fs, io, sync}; +use std::{collections, fs, future, io, sync}; use futures::stream; @@ -216,50 +216,62 @@ impl FSStore { } impl super::Store for FSStore { - fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> { - // attempt to lock this descr for syncing, doing so within a new scope so the mutex - // isn't actually being held for the whole method duration. - let is_already_syncing = { - self.sync_guard - .lock() - .unwrap() - .insert(descr.clone(), ()) - .is_some() - }; + fn sync( + &self, + descr: &origin::Descr, + ) -> util::BoxFuture<'static, Result<(), origin::SyncError>> { + // TODO this implementation is kind of cheating, as it's doing everything synchronously but + // then returning the result in an async box. But the git store is going to be + // re-implemented soon anyway, so it doesn't matter. + let res = (|| { + // attempt to lock this descr for syncing, doing so within a new scope so the mutex + // isn't actually being held for the whole method duration. + let is_already_syncing = { + self.sync_guard + .lock() + .unwrap() + .insert(descr.clone(), ()) + .is_some() + }; - if is_already_syncing { - return Err(origin::SyncError::AlreadyInProgress); - } + if is_already_syncing { + return Err(origin::SyncError::AlreadyInProgress); + } - let res = self.sync_inner(descr); + let res = self.sync_inner(&descr); - self.sync_guard.lock().unwrap().remove(descr); + self.sync_guard.lock().unwrap().remove(&descr); - let repo = match res { - Ok(repo) => repo, - Err(e) => return Err(e), - }; + let repo = match res { + Ok(repo) => repo, + Err(e) => return Err(e), + }; - // repo is synced at this point (though the sync lock is still held), just gotta create - // the RepoSnapshot and store it. - // - // TODO this is a bit of a memory leak, but by the time we get - // to that point this should all be backed by something which isn't local storage - // anyway. + // repo is synced at this point (though the sync lock is still held), just gotta create + // the RepoSnapshot and store it. + // + // TODO this is a bit of a memory leak, but by the time we get + // to that point this should all be backed by something which isn't local storage + // anyway. - // calling this while the sync lock is held isn't ideal, but it's convenient and - // shouldn't be too terrible generally - let repo_snapshot = self - .create_repo_snapshot(repo, descr) - .map_err(|e| match e { - CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName, - CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), - })?; + // calling this while the sync lock is held isn't ideal, but it's convenient and + // shouldn't be too terrible generally + let repo_snapshot = self + .create_repo_snapshot(repo, &descr) + .map_err(|e| match e { + CreateRepoSnapshotError::InvalidBranchName => { + origin::SyncError::InvalidBranchName + } + CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), + })?; - let mut repo_snapshots = self.repo_snapshots.write().unwrap(); - (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); + let mut repo_snapshots = self.repo_snapshots.write().unwrap(); + (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); - Ok(()) + Ok(()) + })(); + + Box::pin(future::ready(res)) } fn all_descrs(&self) -> Result, origin::AllDescrsError> { @@ -374,8 +386,11 @@ mod tests { let store = super::FSStore::new(&config).expect("store created"); - store.sync(&descr).expect("sync should succeed"); - store.sync(&descr).expect("second sync should succeed"); + store.sync(&descr).await.expect("sync should succeed"); + store + .sync(&descr) + .await + .expect("second sync should succeed"); // RepoSnapshot doesn't exist match store.get_file(&other_descr, "DNE") { diff --git a/src/origin/mux.rs b/src/origin/mux.rs index fe3a2cf..b5ff449 100644 --- a/src/origin/mux.rs +++ b/src/origin/mux.rs @@ -3,7 +3,7 @@ use crate::{origin, util}; pub struct Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { mapping_fn: F, @@ -12,7 +12,7 @@ where impl Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { pub fn new(mapping_fn: F, stores: Vec) -> Store { @@ -22,13 +22,17 @@ where impl origin::Store for Store where - S: origin::Store + 'static, + S: origin::Store + Sync + Send + 'static, F: Fn(&origin::Descr) -> Option + Sync + Send, { - fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> { - (self.mapping_fn)(descr) - .or_unexpected_while(format!("mapping {:?} to store", &descr))? - .sync(descr) + fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { + let descr = descr.clone(); + Box::pin(async move { + (self.mapping_fn)(&descr) + .or_unexpected_while(format!("mapping {:?} to store", &descr))? + .sync(&descr) + .await + }) } fn all_descrs(&self) -> Result, origin::AllDescrsError> { @@ -51,123 +55,3 @@ where .get_file(descr, path) } } - -#[cfg(test)] -mod tests { - use crate::origin; - use std::sync; - - struct Harness { - descr_a: origin::Descr, - descr_b: origin::Descr, - descr_unknown: origin::Descr, - store_a: sync::Arc>, - store_b: sync::Arc>, - store: Box, - } - - impl Harness { - fn new() -> Harness { - let descr_a = origin::Descr::Git { - url: "A".to_string(), - branch_name: "A".to_string(), - }; - - let descr_b = origin::Descr::Git { - url: "B".to_string(), - branch_name: "B".to_string(), - }; - - let store_a = origin::new_mock(); - let store_b = origin::new_mock(); - - Harness { - descr_a: descr_a.clone(), - descr_b: descr_b.clone(), - descr_unknown: origin::Descr::Git { - url: "X".to_string(), - branch_name: "X".to_string(), - }, - store_a: store_a.clone(), - store_b: store_b.clone(), - store: Box::from(super::Store::new( - { - let store_a = store_a.clone(); - let store_b = store_b.clone(); - move |descr| match descr { - &origin::Descr::Git { ref url, .. } if url == "A" => { - Some(store_a.clone()) - } - &origin::Descr::Git { ref url, .. } if url == "B" => { - Some(store_b.clone()) - } - _ => None, - } - }, - vec![store_a.clone(), store_b.clone()], - )), - } - } - } - - #[test] - fn sync() { - let h = Harness::new(); - - { - let descr_a = h.descr_a.clone(); - h.store_a - .lock() - .unwrap() - .expect_sync() - .withf(move |descr: &origin::Descr| descr == &descr_a) - .times(1) - .return_const(Ok::<(), origin::SyncError>(())); - } - - assert_eq!(Ok(()), h.store.sync(&h.descr_a)); - - { - let descr_b = h.descr_b.clone(); - h.store_b - .lock() - .unwrap() - .expect_sync() - .withf(move |descr: &origin::Descr| descr == &descr_b) - .times(1) - .return_const(Ok::<(), origin::SyncError>(())); - } - - assert_eq!(Ok(()), h.store.sync(&h.descr_b)); - - assert!(h.store.sync(&h.descr_unknown).is_err()); - } - - #[test] - fn all_descrs() { - let h = Harness::new(); - - h.store_a - .lock() - .unwrap() - .expect_all_descrs() - .times(1) - .return_const(Ok::, origin::AllDescrsError>(vec![h - .descr_a - .clone()])); - - h.store_b - .lock() - .unwrap() - .expect_all_descrs() - .times(1) - .return_const(Ok::, origin::AllDescrsError>(vec![h - .descr_b - .clone()])); - - assert_eq!( - Ok(vec![h.descr_a.clone(), h.descr_b.clone()]), - h.store.all_descrs(), - ) - } -}