domani/src/origin/git.rs

430 lines
15 KiB
Rust
Raw Normal View History

use crate::error::unexpected::{self, Intoable, Mappable};
use crate::origin;
2023-05-11 09:54:25 +00:00
use std::path::{Path, PathBuf};
use std::{collections, fs, io, sync};
use futures::stream;
2023-06-17 14:04:26 +00:00
#[derive(Clone)]
struct RepoSnapshot {
2023-06-17 14:04:26 +00:00
repo: sync::Arc<gix::ThreadSafeRepository>,
2023-05-11 09:54:25 +00:00
tree_object_id: gix::ObjectId,
}
#[derive(thiserror::Error, Debug)]
enum CreateRepoSnapshotError {
2023-05-11 09:54:25 +00:00
#[error("invalid branch name")]
InvalidBranchName,
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
2023-05-11 09:54:25 +00:00
}
/// Implements the Store trait for Descr::Git, storing the git repos on disk. If any non-git Descrs
/// are used then this implementation will panic.
2023-06-29 14:54:55 +00:00
pub struct FSStore {
2023-05-11 09:54:25 +00:00
dir_path: PathBuf,
// to prevent against syncing the same origin more than once at a time, but still allowing
// more than one origin to be syncing at a time
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
repo_snapshots: sync::RwLock<collections::HashMap<origin::Descr, sync::Arc<RepoSnapshot>>>,
2023-05-11 09:54:25 +00:00
}
2023-06-29 14:54:55 +00:00
impl FSStore {
2023-07-09 12:07:07 +00:00
pub fn new(config: &origin::Config) -> io::Result<Self> {
2023-07-09 14:09:00 +00:00
let dir_path = config.store_dir_path.join("git");
2023-06-29 14:54:55 +00:00
fs::create_dir_all(&dir_path)?;
Ok(Self {
dir_path,
sync_guard: sync::Mutex::new(collections::HashMap::new()),
repo_snapshots: sync::RwLock::new(collections::HashMap::new()),
2023-06-29 14:54:55 +00:00
})
}
2023-05-11 09:54:25 +00:00
fn repo_path(&self, descr: &origin::Descr) -> PathBuf {
self.dir_path.join(descr.id())
}
fn descr_file_path(&self, descr_id: &str) -> PathBuf {
self.dir_path.join(descr_id).join("descr.json")
}
fn branch_ref(&self, branch_name: &str) -> String {
format!("origin/{branch_name}")
}
2023-07-16 13:10:02 +00:00
fn deconstruct_descr(descr: &origin::Descr) -> (&str, &str) {
if let origin::Descr::Git {
ref url,
ref branch_name,
} = descr
{
(url, branch_name)
} else {
panic!("non git descr passed in")
}
}
fn create_repo_snapshot(
2023-05-11 09:54:25 +00:00
&self,
repo: gix::Repository,
descr: &origin::Descr,
) -> Result<RepoSnapshot, CreateRepoSnapshotError> {
2023-07-16 13:10:02 +00:00
let (_, branch_name) = Self::deconstruct_descr(descr);
let branch_ref = self.branch_ref(branch_name);
2023-05-11 09:54:25 +00:00
let commit_object_id = repo
.try_find_reference(&branch_ref)
.map_unexpected_while(|| format!("finding branch ref {branch_ref}"))?
.ok_or(CreateRepoSnapshotError::InvalidBranchName)?
2023-05-11 09:54:25 +00:00
.peel_to_id_in_place()
.or_unexpected_while("peeling id in place")?
2023-05-11 09:54:25 +00:00
.detach();
let tree_object_id = repo
.find_object(commit_object_id)
.map_unexpected_while(|| format!("finding commit object {commit_object_id}"))?
2023-05-11 09:54:25 +00:00
.try_to_commit_ref()
.map_unexpected_while(|| format!("parsing {commit_object_id} as commit"))?
2023-05-11 09:54:25 +00:00
.tree();
Ok(RepoSnapshot {
2023-06-17 14:04:26 +00:00
repo: sync::Arc::new(repo.into()),
2023-05-11 09:54:25 +00:00
tree_object_id,
2023-06-17 14:04:26 +00:00
})
2023-05-11 09:54:25 +00:00
}
fn get_repo_snapshot(
&self,
descr: &origin::Descr,
) -> Result<Option<sync::Arc<RepoSnapshot>>, unexpected::Error> {
{
let repo_snapshots = self.repo_snapshots.read().unwrap();
if let Some(repo_snapshot) = repo_snapshots.get(descr) {
return Ok(Some(repo_snapshot.clone()));
}
}
let repo_path = self.repo_path(descr);
match fs::read_dir(&repo_path) {
Ok(_) => (),
Err(e) => match e.kind() {
io::ErrorKind::NotFound => return Ok(None),
_ => {
return Err(e.into_unexpected_while(format!(
"checking if {} exists",
repo_path.display()
)))
}
},
}
let repo = gix::open(&repo_path)
.map_unexpected_while(|| format!("opening {} as git repo", repo_path.display()))?;
let repo_snapshot = self
.create_repo_snapshot(repo, descr)
.map_err(|e| match e {
// it's not expected that the branch name is invalid at this point, it must have
// existed for sync to have been successful.
CreateRepoSnapshotError::InvalidBranchName => e.into_unexpected().into(),
CreateRepoSnapshotError::Unexpected(e) => e,
})?;
let repo_snapshot = sync::Arc::new(repo_snapshot);
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), repo_snapshot.clone());
Ok(Some(repo_snapshot))
}
fn sync_inner(&self, descr: &origin::Descr) -> Result<gix::Repository, origin::SyncError> {
2023-05-11 09:54:25 +00:00
use gix::clone::Error as gixCloneErr;
use gix::progress::Discard;
let should_interrupt = &core::sync::atomic::AtomicBool::new(false);
2023-05-16 14:20:01 +00:00
let repo_path = &self.repo_path(descr);
2023-05-11 09:54:25 +00:00
// if the path doesn't exist then use the gix clone feature to clone it into the
// directory.
if fs::read_dir(repo_path).is_err() {
fs::create_dir_all(repo_path)
.map_unexpected_while(|| format!("creating {}", repo_path.display()))?;
2023-05-11 09:54:25 +00:00
2023-07-16 13:10:02 +00:00
let (url, branch_name) = Self::deconstruct_descr(descr);
2023-05-11 09:54:25 +00:00
let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path)
.map_err(|e| match e {
gixCloneErr::Init(gix::init::Error::InvalidBranchName { .. }) => {
origin::SyncError::InvalidBranchName
2023-05-11 09:54:25 +00:00
}
gixCloneErr::UrlParse(_) | gixCloneErr::CanonicalizeUrl { .. } => {
origin::SyncError::InvalidURL
2023-05-11 09:54:25 +00:00
}
_ => e
.into_unexpected_while(format!(
"cloning {} into {}",
url,
repo_path.display()
))
.into(),
2023-05-11 09:54:25 +00:00
})?
.fetch_only(Discard, should_interrupt)
.map_err(|_| origin::SyncError::InvalidURL)?;
2023-05-11 09:54:25 +00:00
// Check to make sure the branch name exists
// TODO if this fails we should delete repo_path
let branch_ref = self.branch_ref(branch_name);
repo.try_find_reference(&branch_ref)
.map_unexpected_while(|| format!("finding branch ref {branch_ref}"))?
.ok_or(origin::SyncError::InvalidBranchName)?;
2023-05-11 09:54:25 +00:00
// Add the descr to the repo directory, so we can know the actual descr later
// TODO if this fails we should delete repo_path
let file_path = self.descr_file_path(descr.id().as_ref());
let descr_file = fs::File::create(&file_path)
.map_unexpected_while(|| format!("creating {}", file_path.display()))?;
2023-05-11 09:54:25 +00:00
serde_json::to_writer(descr_file, &descr)
.map_unexpected_while(|| format!("writing descr to {}", file_path.display()))?;
2023-05-11 09:54:25 +00:00
return Ok(repo);
}
let direction = gix::remote::Direction::Fetch;
let repo = gix::open(repo_path)
.map_unexpected_while(|| format!("opening repo at {}", repo_path.display()))?;
2023-05-11 09:54:25 +00:00
let remote = repo
.find_default_remote(direction)
.ok_or_else(|| unexpected::Error::from("no default configured"))?
.or_unexpected_while("finding default remote for fetching")?;
2023-05-11 09:54:25 +00:00
remote
.connect(direction)
.or_unexpected_while("connecting to remote")?
2023-05-11 09:54:25 +00:00
.prepare_fetch(Discard, Default::default())
.or_unexpected_while("preparing fetch")?
2023-05-11 09:54:25 +00:00
.receive(Discard, should_interrupt)
.or_unexpected_while("fetching from remote")?;
2023-05-11 09:54:25 +00:00
Ok(repo)
}
}
2023-06-29 14:54:55 +00:00
impl super::Store for FSStore {
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
2023-05-11 09:54:25 +00:00
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
2023-05-11 09:54:25 +00:00
}
let res = self.sync_inner(descr);
2023-05-11 09:54:25 +00:00
2023-07-04 17:42:12 +00:00
self.sync_guard.lock().unwrap().remove(descr);
2023-05-11 09:54:25 +00:00
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
2023-05-11 09:54:25 +00:00
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
2023-05-11 09:54:25 +00:00
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
2023-05-11 09:54:25 +00:00
Ok(())
}
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
fs::read_dir(&self.dir_path).or_unexpected()?.map(
|dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, origin::AllDescrsError> {
let descr_id: String = dir_entry_res
.or_unexpected()?
.file_name()
.to_str()
.ok_or_else(|| {
unexpected::Error::from("couldn't convert os string to &str")
})?
.into();
let descr_file_path = self.descr_file_path(descr_id.as_ref());
// TODO it's possible that opening the file will fail if syncing is
// still ongoing, as writing the descr file is the last step after
// initial sync has succeeded.
let descr_file = fs::File::open(descr_file_path.as_path())
.map_unexpected_while(|| {
format!("opening descr file {}", descr_file_path.display())
})?;
let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| {
format!("reading descr file {}", descr_file_path.display())
})?;
Ok(descr)
},
).try_collect()
2023-05-11 09:54:25 +00:00
}
fn get_file<'req>(
&self,
descr: &'req origin::Descr,
req: origin::GetFileRequest<'req>,
) -> Result<origin::GetFileResponse, origin::GetFileError> {
let repo_snapshot = match self.get_repo_snapshot(descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(req.path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = file_object.data.clone();
Ok(origin::GetFileResponse {
body: Box::pin(stream::once(async move { Ok(data) })),
})
}
2023-05-11 09:54:25 +00:00
}
#[cfg(test)]
mod tests {
2023-07-09 12:07:07 +00:00
use crate::origin::{self, Config, Store};
use futures::StreamExt;
use std::{net, str::FromStr};
2023-05-11 09:54:25 +00:00
use tempdir::TempDir;
#[tokio::test]
async fn basic() {
let client_ip = net::IpAddr::from_str("127.0.0.1").unwrap();
2023-05-11 09:54:25 +00:00
let tmp_dir = TempDir::new("origin_store_git").unwrap();
2023-07-09 12:07:07 +00:00
let config = Config {
store_dir_path: tmp_dir.path().to_path_buf(),
};
2023-05-11 09:54:25 +00:00
let curr_dir = format!("file://{}", std::env::current_dir().unwrap().display());
let descr = origin::Descr::Git {
url: curr_dir.clone(),
branch_name: String::from("main"),
2023-05-11 09:54:25 +00:00
};
let other_descr = origin::Descr::Git {
url: curr_dir.clone(),
branch_name: String::from("some_other_branch"),
};
2023-07-09 12:07:07 +00:00
let store = super::FSStore::new(&config).expect("store created");
2023-05-11 09:54:25 +00:00
store.sync(&descr).expect("sync should succeed");
store.sync(&descr).expect("second sync should succeed");
2023-05-11 09:54:25 +00:00
// RepoSnapshot doesn't exist
match store.get_file(
&other_descr,
origin::GetFileRequest {
path: "DNE",
client_ip: &client_ip,
},
) {
Err(origin::GetFileError::DescrNotSynced) => (),
_ => assert!(false, "descr should have not been found"),
};
2023-05-11 09:54:25 +00:00
let assert_file_dne = |path: &str| match store.get_file(
&descr,
origin::GetFileRequest {
path,
client_ip: &client_ip,
},
) {
Err(origin::GetFileError::FileNotFound) => (),
_ => assert!(false, "file should have not been found"),
};
let assert_file_not_empty = |path: &str| {
let origin::GetFileResponse { body } = store
.get_file(
&descr,
origin::GetFileRequest {
path,
client_ip: &client_ip,
},
)
.expect("file not retrieved");
async move {
let body = body.map(|r| r.unwrap()).concat().await;
assert!(body.len() > 0);
}
};
assert_file_not_empty("src/lib.rs").await;
assert_file_not_empty("/src/lib.rs").await;
assert_file_dne("DNE");
assert_file_dne("src/../src/lib.rs");
2023-05-11 09:54:25 +00:00
2023-06-25 12:07:37 +00:00
let descrs = store.all_descrs().expect("all_descrs called");
2023-05-11 09:54:25 +00:00
assert_eq!(1, descrs.len());
2023-06-25 12:07:37 +00:00
assert_eq!(descr, descrs[0]);
2023-05-11 09:54:25 +00:00
}
}