use crate::error::unexpected::{self, Intoable, Mappable}; use crate::{origin, util}; use std::path::{Path, PathBuf}; use std::{collections, fs, io, sync}; use futures::stream; #[derive(Clone)] struct RepoSnapshot { repo: sync::Arc, tree_object_id: gix::ObjectId, } #[derive(thiserror::Error, Debug)] enum CreateRepoSnapshotError { #[error("invalid branch name")] InvalidBranchName, #[error(transparent)] Unexpected(#[from] unexpected::Error), } /// Implements the Store trait for Descr::Git, storing the git repos on disk. If any non-git Descrs /// are used then this implementation will panic. pub struct FSStore { dir_path: PathBuf, // to prevent against syncing the same origin more than once at a time, but still allowing // more than one origin to be syncing at a time sync_guard: sync::Mutex>, repo_snapshots: sync::RwLock>>, } impl FSStore { pub fn new(dir_path: PathBuf) -> io::Result { fs::create_dir_all(&dir_path)?; Ok(Self { dir_path, sync_guard: sync::Mutex::new(collections::HashMap::new()), repo_snapshots: sync::RwLock::new(collections::HashMap::new()), }) } fn repo_path(&self, descr: &origin::Descr) -> PathBuf { self.dir_path.join(descr.id()) } fn descr_file_path(&self, descr_id: &str) -> PathBuf { self.dir_path.join(descr_id).join("descr.json") } fn branch_ref(&self, branch_name: &str) -> String { format!("origin/{branch_name}") } fn create_repo_snapshot( &self, repo: gix::Repository, descr: &origin::Descr, ) -> Result { let origin::Descr::Git { ref branch_name, .. } = descr; let branch_ref = self.branch_ref(branch_name); let commit_object_id = repo .try_find_reference(&branch_ref) .map_unexpected_while(|| format!("finding branch ref {branch_ref}"))? .ok_or(CreateRepoSnapshotError::InvalidBranchName)? .peel_to_id_in_place() .or_unexpected_while("peeling id in place")? .detach(); let tree_object_id = repo .find_object(commit_object_id) .map_unexpected_while(|| format!("finding commit object {commit_object_id}"))? .try_to_commit_ref() .map_unexpected_while(|| format!("parsing {commit_object_id} as commit"))? .tree(); Ok(RepoSnapshot { repo: sync::Arc::new(repo.into()), tree_object_id, }) } fn get_repo_snapshot( &self, descr: &origin::Descr, ) -> Result>, unexpected::Error> { { let repo_snapshots = self.repo_snapshots.read().unwrap(); if let Some(repo_snapshot) = repo_snapshots.get(descr) { return Ok(Some(repo_snapshot.clone())); } } let repo_path = self.repo_path(descr); match fs::read_dir(&repo_path) { Ok(_) => (), Err(e) => match e.kind() { io::ErrorKind::NotFound => return Ok(None), _ => { return Err(e.into_unexpected_while(format!( "checking if {} exists", repo_path.display() ))) } }, } let repo = gix::open(&repo_path) .map_unexpected_while(|| format!("opening {} as git repo", repo_path.display()))?; let repo_snapshot = self .create_repo_snapshot(repo, descr) .map_err(|e| match e { // it's not expected that the branch name is invalid at this point, it must have // existed for sync to have been successful. CreateRepoSnapshotError::InvalidBranchName => e.into_unexpected().into(), CreateRepoSnapshotError::Unexpected(e) => e, })?; let repo_snapshot = sync::Arc::new(repo_snapshot); let mut repo_snapshots = self.repo_snapshots.write().unwrap(); (*repo_snapshots).insert(descr.clone(), repo_snapshot.clone()); Ok(Some(repo_snapshot)) } fn sync_inner(&self, descr: &origin::Descr) -> Result { use gix::clone::Error as gixCloneErr; use gix::progress::Discard; let should_interrupt = &core::sync::atomic::AtomicBool::new(false); let repo_path = &self.repo_path(descr); // if the path doesn't exist then use the gix clone feature to clone it into the // directory. if fs::read_dir(repo_path).is_err() { fs::create_dir_all(repo_path) .map_unexpected_while(|| format!("creating {}", repo_path.display()))?; let origin::Descr::Git { ref url, ref branch_name, } = descr; let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path) .map_err(|e| match e { gixCloneErr::Init(gix::init::Error::InvalidBranchName { .. }) => { origin::SyncError::InvalidBranchName } gixCloneErr::UrlParse(_) | gixCloneErr::CanonicalizeUrl { .. } => { origin::SyncError::InvalidURL } _ => e .into_unexpected_while(format!( "cloning {} into {}", url, repo_path.display() )) .into(), })? .fetch_only(Discard, should_interrupt) .map_err(|_| origin::SyncError::InvalidURL)?; // Check to make sure the branch name exists // TODO if this fails we should delete repo_path let branch_ref = self.branch_ref(branch_name); repo.try_find_reference(&branch_ref) .map_unexpected_while(|| format!("finding branch ref {branch_ref}"))? .ok_or(origin::SyncError::InvalidBranchName)?; // Add the descr to the repo directory, so we can know the actual descr later // TODO if this fails we should delete repo_path let file_path = self.descr_file_path(descr.id().as_ref()); let descr_file = fs::File::create(&file_path) .map_unexpected_while(|| format!("creating {}", file_path.display()))?; serde_json::to_writer(descr_file, &descr) .map_unexpected_while(|| format!("writing descr to {}", file_path.display()))?; return Ok(repo); } let direction = gix::remote::Direction::Fetch; let repo = gix::open(repo_path) .map_unexpected_while(|| format!("opening repo at {}", repo_path.display()))?; let remote = repo .find_default_remote(direction) .ok_or_else(|| unexpected::Error::from("no default configured"))? .or_unexpected_while("finding default remote for fetching")?; remote .connect(direction) .or_unexpected_while("connecting to remote")? .prepare_fetch(Discard, Default::default()) .or_unexpected_while("preparing fetch")? .receive(Discard, should_interrupt) .or_unexpected_while("fetching from remote")?; Ok(repo) } } impl super::Store for FSStore { fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> { // attempt to lock this descr for syncing, doing so within a new scope so the mutex // isn't actually being held for the whole method duration. let is_already_syncing = { self.sync_guard .lock() .unwrap() .insert(descr.clone(), ()) .is_some() }; if is_already_syncing { return Err(origin::SyncError::AlreadyInProgress); } let res = self.sync_inner(descr); self.sync_guard.lock().unwrap().remove(descr); let repo = match res { Ok(repo) => repo, Err(e) => return Err(e), }; // repo is synced at this point (though the sync lock is still held), just gotta create // the RepoSnapshot and store it. // // TODO this is a bit of a memory leak, but by the time we get // to that point this should all be backed by something which isn't local storage // anyway. // calling this while the sync lock is held isn't ideal, but it's convenient and // shouldn't be too terrible generally let repo_snapshot = self .create_repo_snapshot(repo, descr) .map_err(|e| match e { CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName, CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), })?; let mut repo_snapshots = self.repo_snapshots.write().unwrap(); (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); Ok(()) } fn all_descrs(&self) -> Result, origin::AllDescrsError> { fs::read_dir(&self.dir_path).or_unexpected()?.map( |dir_entry_res: io::Result| -> Result { let descr_id: String = dir_entry_res .or_unexpected()? .file_name() .to_str() .ok_or_else(|| { unexpected::Error::from("couldn't convert os string to &str") })? .into(); let descr_file_path = self.descr_file_path(descr_id.as_ref()); // TODO it's possible that opening the file will fail if syncing is // still ongoing, as writing the descr file is the last step after // initial sync has succeeded. let descr_file = fs::File::open(descr_file_path.as_path()) .map_unexpected_while(|| { format!("opening descr file {}", descr_file_path.display()) })?; let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| { format!("reading descr file {}", descr_file_path.display()) })?; Ok(descr) }, ).try_collect() } fn read_file_into( &self, descr: &origin::Descr, path: &str, into: &mut dyn std::io::Write, ) -> Result<(), origin::ReadFileIntoError> { let repo_snapshot = match self.get_repo_snapshot(descr) { Ok(Some(repo_snapshot)) => repo_snapshot, Ok(None) => return Err(origin::ReadFileIntoError::DescrNotSynced), Err(e) => return Err(e.into()), }; let mut clean_path = Path::new(path); clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); let repo = repo_snapshot.repo.to_thread_local(); let file_object = repo .find_object(repo_snapshot.tree_object_id) .map_unexpected_while(|| { format!("finding tree object {}", repo_snapshot.tree_object_id) })? .peel_to_tree() .map_unexpected_while(|| { format!("peeling tree object {}", repo_snapshot.tree_object_id) })? .lookup_entry_by_path(clean_path) .map_unexpected_while(|| { format!( "looking up {} in tree object {}", clean_path.display(), repo_snapshot.tree_object_id ) })? .ok_or(origin::ReadFileIntoError::FileNotFound)? .object() .or_unexpected()?; into.write_all(file_object.data.as_ref()) .or_unexpected_while("copying out file")?; Ok(()) } fn get_file<'store>( &'store self, descr: &origin::Descr, path: &str, ) -> Result { let repo_snapshot = match self.get_repo_snapshot(descr) { Ok(Some(repo_snapshot)) => repo_snapshot, Ok(None) => return Err(origin::GetFileError::DescrNotSynced), Err(e) => return Err(e.into()), }; let mut clean_path = Path::new(path); clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); let repo = repo_snapshot.repo.to_thread_local(); let file_object = repo .find_object(repo_snapshot.tree_object_id) .map_unexpected_while(|| { format!("finding tree object {}", repo_snapshot.tree_object_id) })? .peel_to_tree() .map_unexpected_while(|| { format!("peeling tree object {}", repo_snapshot.tree_object_id) })? .lookup_entry_by_path(clean_path) .map_unexpected_while(|| { format!( "looking up {} in tree object {}", clean_path.display(), repo_snapshot.tree_object_id ) })? .ok_or(origin::GetFileError::FileNotFound)? .object() .or_unexpected()?; // TODO this is very not ideal, the whole file is first read totally into memory, and then // that is cloned. let data = file_object.data.clone(); Ok(Box::pin(stream::once(async move { Ok(data) }))) } } #[cfg(test)] mod tests { use crate::origin::{self, Store}; use futures::StreamExt; use tempdir::TempDir; #[tokio::test] async fn basic() { let tmp_dir = TempDir::new("origin_store_git").unwrap(); let curr_dir = format!("file://{}", std::env::current_dir().unwrap().display()); let descr = origin::Descr::Git { url: curr_dir.clone(), branch_name: String::from("main"), }; let other_descr = origin::Descr::Git { url: curr_dir.clone(), branch_name: String::from("some_other_branch"), }; let store = super::FSStore::new(tmp_dir.path().to_path_buf()).expect("store created"); store.sync(&descr).expect("sync should succeed"); store.sync(&descr).expect("second sync should succeed"); { // RepoSnapshot doesn't exist let mut into: Vec = vec![]; assert!(matches!( store.read_file_into(&other_descr, "DNE", &mut into), Err::<_, origin::ReadFileIntoError>(origin::ReadFileIntoError::DescrNotSynced), )); } let assert_file_dne = |path: &str| match store.get_file(&descr, path) { Err(origin::ReadFileIntoError::FileNotFound) => (), _ => assert!(false, "file should have not been found"), }; let assert_file_not_empty = |path: &str| { let f = store.get_file(&descr, path).expect("file not retrieved"); async move { let body = f.map(|r| r.unwrap()).concat().await; assert!(body.len() > 0); } }; assert_file_not_empty("src/lib.rs").await; assert_file_not_empty("/src/lib.rs").await; assert_file_dne("DNE"); assert_file_dne("src/../src/lib.rs"); let descrs = store.all_descrs().expect("all_descrs called"); assert_eq!(1, descrs.len()); assert_eq!(descr, descrs[0]); } }