diff --git a/src/domain/manager.rs b/src/domain/manager.rs index 1ae4a55..c91dfb8 100644 --- a/src/domain/manager.rs +++ b/src/domain/manager.rs @@ -43,6 +43,9 @@ pub enum SyncError { #[error("not found")] NotFound, + #[error("already in progress")] + AlreadyInProgress, + #[error(transparent)] Unexpected(Box), } @@ -67,6 +70,9 @@ pub enum SyncWithConfigError { #[error("invalid domain name")] InvalidDomainName, + #[error("already in progress")] + AlreadyInProgress, + #[error("target CNAME not set")] TargetCNAMENotSet, @@ -82,6 +88,7 @@ impl From for SyncWithConfigError { match e { origin::store::SyncError::InvalidURL => SyncWithConfigError::InvalidURL, origin::store::SyncError::InvalidBranchName => SyncWithConfigError::InvalidBranchName, + origin::store::SyncError::AlreadyInProgress => SyncWithConfigError::AlreadyInProgress, origin::store::SyncError::Unexpected(e) => SyncWithConfigError::Unexpected(e), } } @@ -172,9 +179,11 @@ where fn sync(&self, domain: &str) -> Result<(), SyncError> { let config = self.domain_config_store.get(domain)?; self.origin_store - .sync(&config.origin_descr, origin::store::Limits {}) - // if there's a config there should be an origin, any error here is unexpected - .map_err(|e| SyncError::Unexpected(Box::from(e)))?; + .sync(config.origin_descr, origin::store::Limits {}) + .map_err(|e| match e { + origin::store::SyncError::AlreadyInProgress => SyncError::AlreadyInProgress, + _ => SyncError::Unexpected(Box::from(e)), + })?; Ok(()) } @@ -190,7 +199,7 @@ where self.domain_checker.check_domain(domain, &config_hash)?; self.origin_store - .sync(&config.origin_descr, origin::store::Limits {})?; + .sync(config.origin_descr.clone(), origin::store::Limits {})?; self.domain_config_store.set(domain, config)?; diff --git a/src/origin/descr.rs b/src/origin/descr.rs index 9d971cd..5b78c9f 100644 --- a/src/origin/descr.rs +++ b/src/origin/descr.rs @@ -2,7 +2,7 @@ use hex::ToHex; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] /// A unique description of an origin, from where a domain might be served. pub enum Descr { Git { url: String, branch_name: String }, diff --git a/src/origin/store.rs b/src/origin/store.rs index 9df3e4c..cf281b0 100644 --- a/src/origin/store.rs +++ b/src/origin/store.rs @@ -14,6 +14,9 @@ pub enum SyncError { #[error("invalid branch name")] InvalidBranchName, + #[error("already in progress")] + AlreadyInProgress, + #[error(transparent)] Unexpected(Box), } @@ -42,7 +45,7 @@ pub trait Store<'a> { /// If the origin is of a kind which can be updated, sync will pull down the latest version of /// the origin into the storage. - fn sync(&'a self, descr: &origin::Descr, limits: Limits) -> Result<(), SyncError>; + fn sync(&'a self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError>; fn get(&'a self, descr: origin::Descr) -> Result, GetError>; fn all_descrs(&'a self) -> AllDescrsResult; @@ -50,7 +53,7 @@ pub trait Store<'a> { pub struct MockStore where - SyncFn: Fn(&origin::Descr, Limits) -> Result<(), SyncError>, + SyncFn: Fn(origin::Descr, Limits) -> Result<(), SyncError>, GetFn: Fn(origin::Descr) -> Result, GetError>, AllDescrsFn: Fn() -> AllDescrsResult>>, { @@ -61,13 +64,13 @@ where impl<'a, SyncFn, GetFn, AllDescrsFn> Store<'a> for MockStore where - SyncFn: Fn(&origin::Descr, Limits) -> Result<(), SyncError>, + SyncFn: Fn(origin::Descr, Limits) -> Result<(), SyncError>, GetFn: Fn(origin::Descr) -> Result, GetError>, AllDescrsFn: Fn() -> AllDescrsResult>>, { type AllDescrsIter = Vec>; - fn sync(&'a self, descr: &origin::Descr, limits: Limits) -> Result<(), SyncError> { + fn sync(&'a self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError> { (self.sync_fn)(descr, limits) } @@ -85,16 +88,17 @@ pub mod git { use crate::origin; use crate::origin::store; + use std::error::Error; use std::path::{Path, PathBuf}; - use std::{fs, io}; + use std::{collections, fs, io, sync}; struct Origin { descr: origin::Descr, - repo: gix::Repository, + repo: gix::ThreadSafeRepository, tree_object_id: gix::ObjectId, } - impl origin::Origin for Origin { + impl origin::Origin for sync::Arc { fn descr(&self) -> &origin::Descr { &self.descr } @@ -107,8 +111,9 @@ pub mod git { let mut clean_path = Path::new(path); clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); - let file_object = self - .repo + let repo = self.repo.to_thread_local(); + + let file_object = repo .find_object(self.tree_object_id) .map_err(|e| origin::ReadFileIntoError::Unexpected(Box::from(e)))? .peel_to_tree() @@ -126,16 +131,35 @@ pub mod git { } } + #[derive(thiserror::Error, Debug)] + enum GetOriginError { + #[error("invalid branch name")] + InvalidBranchName, + + #[error(transparent)] + Unexpected(Box), + } + /// git::Store implements the Store trait for any Descr::Git based Origins. If any non-git /// Descrs are used then this implementation will panic. pub struct Store { dir_path: PathBuf, + + // to prevent against syncing the same origin more than once at a time, but still allowing + // more than one origin to be syncing at a time + sync_guard: sync::Mutex>, + + origins: sync::RwLock>>, } impl Store { pub fn new(dir_path: PathBuf) -> io::Result { fs::create_dir_all(&dir_path)?; - Ok(Store { dir_path }) + Ok(Store { + dir_path, + sync_guard: sync::Mutex::new(collections::HashMap::new()), + origins: sync::RwLock::new(collections::HashMap::new()), + }) } fn repo_path(&self, descr: &origin::Descr) -> PathBuf { @@ -149,21 +173,48 @@ pub mod git { fn branch_ref(&self, branch_name: &str) -> String { format!("origin/{branch_name}") } - } - impl<'a> super::Store<'a> for Store { - type AllDescrsIter = Box> + 'a>; + fn get_origin( + &self, + repo: gix::Repository, + descr: origin::Descr, + ) -> Result, GetOriginError> { + let origin::Descr::Git { + ref branch_name, .. + } = descr; - fn sync( - &'a self, + let commit_object_id = repo + .try_find_reference(&self.branch_ref(branch_name)) + .map_err(|e| GetOriginError::Unexpected(Box::from(e)))? + .ok_or(GetOriginError::InvalidBranchName)? + .peel_to_id_in_place() + .map_err(|e| GetOriginError::Unexpected(Box::from(e)))? + .detach(); + + let tree_object_id = repo + .find_object(commit_object_id) + .map_err(|e| GetOriginError::Unexpected(Box::from(e)))? + .try_to_commit_ref() + .map_err(|e| GetOriginError::Unexpected(Box::from(e)))? + .tree(); + + return Ok(sync::Arc::from(Origin { + descr, + repo: repo.into(), + tree_object_id, + })); + } + + fn sync_inner( + &self, descr: &origin::Descr, _limits: store::Limits, - ) -> Result<(), store::SyncError> { + ) -> Result { use gix::clone::Error as gixCloneErr; use gix::progress::Discard; let should_interrupt = &core::sync::atomic::AtomicBool::new(false); - let repo_path = &self.repo_path(descr); + let repo_path = &self.repo_path(&descr); // if the path doesn't exist then use the gix clone feature to clone it into the // directory. @@ -171,7 +222,10 @@ pub mod git { fs::create_dir_all(repo_path) .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; - let origin::Descr::Git { url, branch_name } = descr; + let origin::Descr::Git { + ref url, + ref branch_name, + } = descr; let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path) .map_err(|e| match e { @@ -200,7 +254,7 @@ pub mod git { serde_json::to_writer(descr_file, &descr) .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; - return Ok(()); + return Ok(repo); } let direction = gix::remote::Direction::Fetch; @@ -221,14 +275,70 @@ pub mod git { .receive(Discard, should_interrupt) .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; + Ok(repo) + } + } + + impl<'a> super::Store<'a> for Store { + type AllDescrsIter = Box> + 'a>; + + fn sync( + &'a self, + descr: origin::Descr, + limits: store::Limits, + ) -> Result<(), store::SyncError> { + // attempt to lock this descr for syncing, doing so within a new scope so the mutex + // isn't actually being held for the whole method duration. + let is_already_syncing = { + self.sync_guard + .lock() + .unwrap() + .insert(descr.clone(), ()) + .is_some() + }; + + if is_already_syncing { + return Err(store::SyncError::AlreadyInProgress); + } + + let res = self.sync_inner(&descr, limits); + + self.sync_guard.lock().unwrap().remove(&descr); + + let repo = match res { + Ok(repo) => repo, + Err(e) => return Err(e), + }; + + // repo is synced at this point (though the sync lock is still held), just gotta create + // the origin and store it. + // + // TODO this is a bit of a memory leak, but by the time we get + // to that point this should all be backed by something which isn't local storage + // anyway. + + // calling this while the sync lock is held isn't ideal, but it's convenient and + // shouldn't be too terrible generally + let origin = self.get_origin(repo, descr.clone()).map_err(|e| match e { + GetOriginError::InvalidBranchName => store::SyncError::InvalidBranchName, + GetOriginError::Unexpected(e) => store::SyncError::Unexpected(e), + })?; + + let mut origins = self.origins.write().unwrap(); + (*origins).insert(descr, origin); + Ok(()) } fn get(&'a self, descr: origin::Descr) -> Result, store::GetError> { + { + let origins = self.origins.read().unwrap(); + if let Some(origin) = origins.get(&descr) { + return Ok(Box::from(origin.clone())); + } + } + let repo_path = self.repo_path(&descr); - let origin::Descr::Git { - ref branch_name, .. - } = descr; fs::read_dir(&repo_path).map_err(|e| match e.kind() { io::ErrorKind::NotFound => store::GetError::NotFound, @@ -238,25 +348,17 @@ pub mod git { let repo = gix::open(&repo_path).map_err(|e| store::GetError::Unexpected(Box::from(e)))?; - let commit_object_id = repo - .find_reference(&self.branch_ref(branch_name)) - .map_err(|e| store::GetError::Unexpected(Box::from(e)))? - .peel_to_id_in_place() - .map_err(|e| store::GetError::Unexpected(Box::from(e)))? - .detach(); + let origin = self.get_origin(repo, descr.clone()).map_err(|e| match e { + // it's not expected that the branch name is invalid at this point, it must have + // existed for sync to have been successful. + GetOriginError::InvalidBranchName => store::GetError::Unexpected(Box::from(e)), + GetOriginError::Unexpected(e) => store::GetError::Unexpected(e), + })?; - let tree_object_id = repo - .find_object(commit_object_id) - .map_err(|e| store::GetError::Unexpected(Box::from(e)))? - .try_to_commit_ref() - .map_err(|e| store::GetError::Unexpected(Box::from(e)))? - .tree(); + let mut origins = self.origins.write().unwrap(); + (*origins).insert(descr, origin.clone()); - return Ok(Box::from(Origin { - descr, - repo, - tree_object_id, - })); + Ok(Box::from(origin)) } fn all_descrs(&'a self) -> store::AllDescrsResult { @@ -318,9 +420,11 @@ pub mod git { let store = super::Store::new(tmp_dir.path().to_path_buf()).expect("store created"); - store.sync(&descr, limits).expect("sync should succeed"); store - .sync(&descr, limits) + .sync(descr.clone(), limits) + .expect("sync should succeed"); + store + .sync(descr.clone(), limits) .expect("second sync should succeed"); assert!(matches!(