Make origin::Store::sync async

In order for this method to make sense as an async, the returned box
must have its lifetime tied to the store. This did not jive well with
the mock implementation of origin::Store, so I ditched that, along with
the mux tests which depended on it. Oh well.
main
Brian Picciano 4 months ago
parent 6dd53f96d8
commit 78b67a02ad
  1. 8
      src/domain/manager.rs
  2. 22
      src/origin.rs
  3. 93
      src/origin/git.rs
  4. 138
      src/origin/mux.rs

@ -206,13 +206,13 @@ impl ManagerImpl {
manager
}
fn sync_domain_origin(
async fn sync_domain_origin(
&self,
domain: &domain::Name,
origin_descr: &origin::Descr,
) -> Result<(), origin::SyncError> {
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
self.origin_store.sync(origin_descr)
self.origin_store.sync(origin_descr).await
}
async fn sync_origins(&self) -> unexpected::Result<()> {
@ -232,6 +232,7 @@ impl ManagerImpl {
};
self.sync_domain_origin(&domain, &settings.origin_descr)
.await
.map_unexpected_while(|| {
format!(
"syncing origin {:?} for domain {domain}",
@ -423,7 +424,8 @@ impl Manager for ManagerImpl {
self.domain_checker.check_domain(&domain, &hash).await?;
self.sync_domain_origin(&domain, &settings.origin_descr)?;
self.sync_domain_origin(&domain, &settings.origin_descr)
.await?;
if self.can_sync_gemini_cert() {
self.sync_domain_gemini_cert(&domain)

@ -8,7 +8,6 @@ pub use descr::Descr;
use crate::error::unexpected;
use crate::util;
use std::sync;
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum SyncError {
@ -46,32 +45,13 @@ pub enum GetFileError {
Unexpected(#[from] unexpected::Error),
}
#[mockall::automock]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store {
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage.
fn sync(&self, descr: &Descr) -> Result<(), SyncError>;
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>;
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError>;
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
}
pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
sync::Arc::new(sync::Mutex::new(MockStore::new()))
}
impl Store for sync::Arc<sync::Mutex<MockStore>> {
fn sync(&self, descr: &Descr) -> Result<(), SyncError> {
self.lock().unwrap().sync(descr)
}
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError> {
self.lock().unwrap().all_descrs()
}
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError> {
self.lock().unwrap().get_file(descr, path)
}
}

@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util};
use std::path::{Path, PathBuf};
use std::{collections, fs, io, sync};
use std::{collections, fs, future, io, sync};
use futures::stream;
@ -216,50 +216,62 @@ impl FSStore {
}
impl super::Store for FSStore {
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
fn sync(
&self,
descr: &origin::Descr,
) -> util::BoxFuture<'static, Result<(), origin::SyncError>> {
// TODO this implementation is kind of cheating, as it's doing everything synchronously but
// then returning the result in an async box. But the git store is going to be
// re-implemented soon anyway, so it doesn't matter.
let res = (|| {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
let res = self.sync_inner(&descr);
let res = self.sync_inner(descr);
self.sync_guard.lock().unwrap().remove(&descr);
self.sync_guard.lock().unwrap().remove(descr);
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, &descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => {
origin::SyncError::InvalidBranchName
}
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
Ok(())
})();
Ok(())
Box::pin(future::ready(res))
}
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
@ -374,8 +386,11 @@ mod tests {
let store = super::FSStore::new(&config).expect("store created");
store.sync(&descr).expect("sync should succeed");
store.sync(&descr).expect("second sync should succeed");
store.sync(&descr).await.expect("sync should succeed");
store
.sync(&descr)
.await
.expect("second sync should succeed");
// RepoSnapshot doesn't exist
match store.get_file(&other_descr, "DNE") {

@ -3,7 +3,7 @@ use crate::{origin, util};
pub struct Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
mapping_fn: F,
@ -12,7 +12,7 @@ where
impl<F, S> Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
@ -22,13 +22,17 @@ where
impl<F, S> origin::Store for Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(descr)
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(&descr)
.await
})
}
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
@ -51,123 +55,3 @@ where
.get_file(descr, path)
}
}
#[cfg(test)]
mod tests {
use crate::origin;
use std::sync;
struct Harness {
descr_a: origin::Descr,
descr_b: origin::Descr,
descr_unknown: origin::Descr,
store_a: sync::Arc<sync::Mutex<origin::MockStore>>,
store_b: sync::Arc<sync::Mutex<origin::MockStore>>,
store: Box<dyn origin::Store>,
}
impl Harness {
fn new() -> Harness {
let descr_a = origin::Descr::Git {
url: "A".to_string(),
branch_name: "A".to_string(),
};
let descr_b = origin::Descr::Git {
url: "B".to_string(),
branch_name: "B".to_string(),
};
let store_a = origin::new_mock();
let store_b = origin::new_mock();
Harness {
descr_a: descr_a.clone(),
descr_b: descr_b.clone(),
descr_unknown: origin::Descr::Git {
url: "X".to_string(),
branch_name: "X".to_string(),
},
store_a: store_a.clone(),
store_b: store_b.clone(),
store: Box::from(super::Store::new(
{
let store_a = store_a.clone();
let store_b = store_b.clone();
move |descr| match descr {
&origin::Descr::Git { ref url, .. } if url == "A" => {
Some(store_a.clone())
}
&origin::Descr::Git { ref url, .. } if url == "B" => {
Some(store_b.clone())
}
_ => None,
}
},
vec![store_a.clone(), store_b.clone()],
)),
}
}
}
#[test]
fn sync() {
let h = Harness::new();
{
let descr_a = h.descr_a.clone();
h.store_a
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_a)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_a));
{
let descr_b = h.descr_b.clone();
h.store_b
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_b)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_b));
assert!(h.store.sync(&h.descr_unknown).is_err());
}
#[test]
fn all_descrs() {
let h = Harness::new();
h.store_a
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_a
.clone()]));
h.store_b
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_b
.clone()]));
assert_eq!(
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
h.store.all_descrs(),
)
}
}

Loading…
Cancel
Save