Add locks around git store syncing and getting

This commit is contained in:
Brian Picciano 2023-05-10 15:12:34 +02:00
parent e5b64f4968
commit d1842943cd
3 changed files with 159 additions and 46 deletions

View File

@ -43,6 +43,9 @@ pub enum SyncError {
#[error("not found")] #[error("not found")]
NotFound, NotFound,
#[error("already in progress")]
AlreadyInProgress,
#[error(transparent)] #[error(transparent)]
Unexpected(Box<dyn Error>), Unexpected(Box<dyn Error>),
} }
@ -67,6 +70,9 @@ pub enum SyncWithConfigError {
#[error("invalid domain name")] #[error("invalid domain name")]
InvalidDomainName, InvalidDomainName,
#[error("already in progress")]
AlreadyInProgress,
#[error("target CNAME not set")] #[error("target CNAME not set")]
TargetCNAMENotSet, TargetCNAMENotSet,
@ -82,6 +88,7 @@ impl From<origin::store::SyncError> for SyncWithConfigError {
match e { match e {
origin::store::SyncError::InvalidURL => SyncWithConfigError::InvalidURL, origin::store::SyncError::InvalidURL => SyncWithConfigError::InvalidURL,
origin::store::SyncError::InvalidBranchName => SyncWithConfigError::InvalidBranchName, origin::store::SyncError::InvalidBranchName => SyncWithConfigError::InvalidBranchName,
origin::store::SyncError::AlreadyInProgress => SyncWithConfigError::AlreadyInProgress,
origin::store::SyncError::Unexpected(e) => SyncWithConfigError::Unexpected(e), origin::store::SyncError::Unexpected(e) => SyncWithConfigError::Unexpected(e),
} }
} }
@ -172,9 +179,11 @@ where
fn sync(&self, domain: &str) -> Result<(), SyncError> { fn sync(&self, domain: &str) -> Result<(), SyncError> {
let config = self.domain_config_store.get(domain)?; let config = self.domain_config_store.get(domain)?;
self.origin_store self.origin_store
.sync(&config.origin_descr, origin::store::Limits {}) .sync(config.origin_descr, origin::store::Limits {})
// if there's a config there should be an origin, any error here is unexpected .map_err(|e| match e {
.map_err(|e| SyncError::Unexpected(Box::from(e)))?; origin::store::SyncError::AlreadyInProgress => SyncError::AlreadyInProgress,
_ => SyncError::Unexpected(Box::from(e)),
})?;
Ok(()) Ok(())
} }
@ -190,7 +199,7 @@ where
self.domain_checker.check_domain(domain, &config_hash)?; self.domain_checker.check_domain(domain, &config_hash)?;
self.origin_store self.origin_store
.sync(&config.origin_descr, origin::store::Limits {})?; .sync(config.origin_descr.clone(), origin::store::Limits {})?;
self.domain_config_store.set(domain, config)?; self.domain_config_store.set(domain, config)?;

View File

@ -2,7 +2,7 @@ use hex::ToHex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
/// A unique description of an origin, from where a domain might be served. /// A unique description of an origin, from where a domain might be served.
pub enum Descr { pub enum Descr {
Git { url: String, branch_name: String }, Git { url: String, branch_name: String },

View File

@ -14,6 +14,9 @@ pub enum SyncError {
#[error("invalid branch name")] #[error("invalid branch name")]
InvalidBranchName, InvalidBranchName,
#[error("already in progress")]
AlreadyInProgress,
#[error(transparent)] #[error(transparent)]
Unexpected(Box<dyn Error>), Unexpected(Box<dyn Error>),
} }
@ -42,7 +45,7 @@ pub trait Store<'a> {
/// If the origin is of a kind which can be updated, sync will pull down the latest version of /// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage. /// the origin into the storage.
fn sync(&'a self, descr: &origin::Descr, limits: Limits) -> Result<(), SyncError>; fn sync(&'a self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError>;
fn get(&'a self, descr: origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>; fn get(&'a self, descr: origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>;
fn all_descrs(&'a self) -> AllDescrsResult<Self::AllDescrsIter>; fn all_descrs(&'a self) -> AllDescrsResult<Self::AllDescrsIter>;
@ -50,7 +53,7 @@ pub trait Store<'a> {
pub struct MockStore<SyncFn, GetFn, AllDescrsFn> pub struct MockStore<SyncFn, GetFn, AllDescrsFn>
where where
SyncFn: Fn(&origin::Descr, Limits) -> Result<(), SyncError>, SyncFn: Fn(origin::Descr, Limits) -> Result<(), SyncError>,
GetFn: Fn(origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>, GetFn: Fn(origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>,
AllDescrsFn: Fn() -> AllDescrsResult<Vec<AllDescrsResult<origin::Descr>>>, AllDescrsFn: Fn() -> AllDescrsResult<Vec<AllDescrsResult<origin::Descr>>>,
{ {
@ -61,13 +64,13 @@ where
impl<'a, SyncFn, GetFn, AllDescrsFn> Store<'a> for MockStore<SyncFn, GetFn, AllDescrsFn> impl<'a, SyncFn, GetFn, AllDescrsFn> Store<'a> for MockStore<SyncFn, GetFn, AllDescrsFn>
where where
SyncFn: Fn(&origin::Descr, Limits) -> Result<(), SyncError>, SyncFn: Fn(origin::Descr, Limits) -> Result<(), SyncError>,
GetFn: Fn(origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>, GetFn: Fn(origin::Descr) -> Result<Box<dyn origin::Origin>, GetError>,
AllDescrsFn: Fn() -> AllDescrsResult<Vec<AllDescrsResult<origin::Descr>>>, AllDescrsFn: Fn() -> AllDescrsResult<Vec<AllDescrsResult<origin::Descr>>>,
{ {
type AllDescrsIter = Vec<AllDescrsResult<origin::Descr>>; type AllDescrsIter = Vec<AllDescrsResult<origin::Descr>>;
fn sync(&'a self, descr: &origin::Descr, limits: Limits) -> Result<(), SyncError> { fn sync(&'a self, descr: origin::Descr, limits: Limits) -> Result<(), SyncError> {
(self.sync_fn)(descr, limits) (self.sync_fn)(descr, limits)
} }
@ -85,16 +88,17 @@ pub mod git {
use crate::origin; use crate::origin;
use crate::origin::store; use crate::origin::store;
use std::error::Error;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::{fs, io}; use std::{collections, fs, io, sync};
struct Origin { struct Origin {
descr: origin::Descr, descr: origin::Descr,
repo: gix::Repository, repo: gix::ThreadSafeRepository,
tree_object_id: gix::ObjectId, tree_object_id: gix::ObjectId,
} }
impl origin::Origin for Origin { impl origin::Origin for sync::Arc<Origin> {
fn descr(&self) -> &origin::Descr { fn descr(&self) -> &origin::Descr {
&self.descr &self.descr
} }
@ -107,8 +111,9 @@ pub mod git {
let mut clean_path = Path::new(path); let mut clean_path = Path::new(path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let file_object = self let repo = self.repo.to_thread_local();
.repo
let file_object = repo
.find_object(self.tree_object_id) .find_object(self.tree_object_id)
.map_err(|e| origin::ReadFileIntoError::Unexpected(Box::from(e)))? .map_err(|e| origin::ReadFileIntoError::Unexpected(Box::from(e)))?
.peel_to_tree() .peel_to_tree()
@ -126,16 +131,35 @@ pub mod git {
} }
} }
#[derive(thiserror::Error, Debug)]
enum GetOriginError {
#[error("invalid branch name")]
InvalidBranchName,
#[error(transparent)]
Unexpected(Box<dyn Error>),
}
/// git::Store implements the Store trait for any Descr::Git based Origins. If any non-git /// git::Store implements the Store trait for any Descr::Git based Origins. If any non-git
/// Descrs are used then this implementation will panic. /// Descrs are used then this implementation will panic.
pub struct Store { pub struct Store {
dir_path: PathBuf, dir_path: PathBuf,
// to prevent against syncing the same origin more than once at a time, but still allowing
// more than one origin to be syncing at a time
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
origins: sync::RwLock<collections::HashMap<origin::Descr, sync::Arc<Origin>>>,
} }
impl Store { impl Store {
pub fn new(dir_path: PathBuf) -> io::Result<Store> { pub fn new(dir_path: PathBuf) -> io::Result<Store> {
fs::create_dir_all(&dir_path)?; fs::create_dir_all(&dir_path)?;
Ok(Store { dir_path }) Ok(Store {
dir_path,
sync_guard: sync::Mutex::new(collections::HashMap::new()),
origins: sync::RwLock::new(collections::HashMap::new()),
})
} }
fn repo_path(&self, descr: &origin::Descr) -> PathBuf { fn repo_path(&self, descr: &origin::Descr) -> PathBuf {
@ -149,21 +173,48 @@ pub mod git {
fn branch_ref(&self, branch_name: &str) -> String { fn branch_ref(&self, branch_name: &str) -> String {
format!("origin/{branch_name}") format!("origin/{branch_name}")
} }
fn get_origin(
&self,
repo: gix::Repository,
descr: origin::Descr,
) -> Result<sync::Arc<Origin>, GetOriginError> {
let origin::Descr::Git {
ref branch_name, ..
} = descr;
let commit_object_id = repo
.try_find_reference(&self.branch_ref(branch_name))
.map_err(|e| GetOriginError::Unexpected(Box::from(e)))?
.ok_or(GetOriginError::InvalidBranchName)?
.peel_to_id_in_place()
.map_err(|e| GetOriginError::Unexpected(Box::from(e)))?
.detach();
let tree_object_id = repo
.find_object(commit_object_id)
.map_err(|e| GetOriginError::Unexpected(Box::from(e)))?
.try_to_commit_ref()
.map_err(|e| GetOriginError::Unexpected(Box::from(e)))?
.tree();
return Ok(sync::Arc::from(Origin {
descr,
repo: repo.into(),
tree_object_id,
}));
} }
impl<'a> super::Store<'a> for Store { fn sync_inner(
type AllDescrsIter = Box<dyn Iterator<Item = store::AllDescrsResult<origin::Descr>> + 'a>; &self,
fn sync(
&'a self,
descr: &origin::Descr, descr: &origin::Descr,
_limits: store::Limits, _limits: store::Limits,
) -> Result<(), store::SyncError> { ) -> Result<gix::Repository, store::SyncError> {
use gix::clone::Error as gixCloneErr; use gix::clone::Error as gixCloneErr;
use gix::progress::Discard; use gix::progress::Discard;
let should_interrupt = &core::sync::atomic::AtomicBool::new(false); let should_interrupt = &core::sync::atomic::AtomicBool::new(false);
let repo_path = &self.repo_path(descr); let repo_path = &self.repo_path(&descr);
// if the path doesn't exist then use the gix clone feature to clone it into the // if the path doesn't exist then use the gix clone feature to clone it into the
// directory. // directory.
@ -171,7 +222,10 @@ pub mod git {
fs::create_dir_all(repo_path) fs::create_dir_all(repo_path)
.map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?;
let origin::Descr::Git { url, branch_name } = descr; let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path) let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path)
.map_err(|e| match e { .map_err(|e| match e {
@ -200,7 +254,7 @@ pub mod git {
serde_json::to_writer(descr_file, &descr) serde_json::to_writer(descr_file, &descr)
.map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?;
return Ok(()); return Ok(repo);
} }
let direction = gix::remote::Direction::Fetch; let direction = gix::remote::Direction::Fetch;
@ -221,14 +275,70 @@ pub mod git {
.receive(Discard, should_interrupt) .receive(Discard, should_interrupt)
.map_err(|e| store::SyncError::Unexpected(Box::from(e)))?; .map_err(|e| store::SyncError::Unexpected(Box::from(e)))?;
Ok(repo)
}
}
impl<'a> super::Store<'a> for Store {
type AllDescrsIter = Box<dyn Iterator<Item = store::AllDescrsResult<origin::Descr>> + 'a>;
fn sync(
&'a self,
descr: origin::Descr,
limits: store::Limits,
) -> Result<(), store::SyncError> {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(store::SyncError::AlreadyInProgress);
}
let res = self.sync_inner(&descr, limits);
self.sync_guard.lock().unwrap().remove(&descr);
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
// repo is synced at this point (though the sync lock is still held), just gotta create
// the origin and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let origin = self.get_origin(repo, descr.clone()).map_err(|e| match e {
GetOriginError::InvalidBranchName => store::SyncError::InvalidBranchName,
GetOriginError::Unexpected(e) => store::SyncError::Unexpected(e),
})?;
let mut origins = self.origins.write().unwrap();
(*origins).insert(descr, origin);
Ok(()) Ok(())
} }
fn get(&'a self, descr: origin::Descr) -> Result<Box<dyn origin::Origin>, store::GetError> { fn get(&'a self, descr: origin::Descr) -> Result<Box<dyn origin::Origin>, store::GetError> {
{
let origins = self.origins.read().unwrap();
if let Some(origin) = origins.get(&descr) {
return Ok(Box::from(origin.clone()));
}
}
let repo_path = self.repo_path(&descr); let repo_path = self.repo_path(&descr);
let origin::Descr::Git {
ref branch_name, ..
} = descr;
fs::read_dir(&repo_path).map_err(|e| match e.kind() { fs::read_dir(&repo_path).map_err(|e| match e.kind() {
io::ErrorKind::NotFound => store::GetError::NotFound, io::ErrorKind::NotFound => store::GetError::NotFound,
@ -238,25 +348,17 @@ pub mod git {
let repo = let repo =
gix::open(&repo_path).map_err(|e| store::GetError::Unexpected(Box::from(e)))?; gix::open(&repo_path).map_err(|e| store::GetError::Unexpected(Box::from(e)))?;
let commit_object_id = repo let origin = self.get_origin(repo, descr.clone()).map_err(|e| match e {
.find_reference(&self.branch_ref(branch_name)) // it's not expected that the branch name is invalid at this point, it must have
.map_err(|e| store::GetError::Unexpected(Box::from(e)))? // existed for sync to have been successful.
.peel_to_id_in_place() GetOriginError::InvalidBranchName => store::GetError::Unexpected(Box::from(e)),
.map_err(|e| store::GetError::Unexpected(Box::from(e)))? GetOriginError::Unexpected(e) => store::GetError::Unexpected(e),
.detach(); })?;
let tree_object_id = repo let mut origins = self.origins.write().unwrap();
.find_object(commit_object_id) (*origins).insert(descr, origin.clone());
.map_err(|e| store::GetError::Unexpected(Box::from(e)))?
.try_to_commit_ref()
.map_err(|e| store::GetError::Unexpected(Box::from(e)))?
.tree();
return Ok(Box::from(Origin { Ok(Box::from(origin))
descr,
repo,
tree_object_id,
}));
} }
fn all_descrs(&'a self) -> store::AllDescrsResult<Self::AllDescrsIter> { fn all_descrs(&'a self) -> store::AllDescrsResult<Self::AllDescrsIter> {
@ -318,9 +420,11 @@ pub mod git {
let store = super::Store::new(tmp_dir.path().to_path_buf()).expect("store created"); let store = super::Store::new(tmp_dir.path().to_path_buf()).expect("store created");
store.sync(&descr, limits).expect("sync should succeed");
store store
.sync(&descr, limits) .sync(descr.clone(), limits)
.expect("sync should succeed");
store
.sync(descr.clone(), limits)
.expect("second sync should succeed"); .expect("second sync should succeed");
assert!(matches!( assert!(matches!(