From 498a33533f1a1cf00639e99cabcc11c25c330fd7 Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Fri, 16 Feb 2024 21:22:39 +0100 Subject: [PATCH] Remove old git implementation completely --- config-dev.yml.tpl | 2 - config.yml | 10 - src/config.rs | 1 - src/main.rs | 5 +- src/origin.rs | 3 - src/origin/config.rs | 7 - src/origin/git.rs | 580 ++++++++++++++++++---------------------- src/origin/git_proxy.rs | 334 ----------------------- 8 files changed, 254 insertions(+), 688 deletions(-) delete mode 100644 src/origin/config.rs delete mode 100644 src/origin/git_proxy.rs diff --git a/config-dev.yml.tpl b/config-dev.yml.tpl index 5fcb262..d161f2e 100644 --- a/config-dev.yml.tpl +++ b/config-dev.yml.tpl @@ -1,7 +1,5 @@ # This is an example configuration file intended for use in development. -origin: - store_dir_path: /tmp/domani_dev_env/origin domain: store_dir_path: /tmp/domani_dev_env/domain builtin_domains: diff --git a/config.yml b/config.yml index c1c0595..9dcb733 100644 --- a/config.yml +++ b/config.yml @@ -1,17 +1,7 @@ -origin: - - # Path under which all origin data (i.e. git repositories, file caches, - # etc...) will be stored. - # - # This should be different than any other store_dir_paths. - #store_dir_path: REQUIRED - domain: # Path under which all domain data (i.e. domains configured by users, HTTPS # certificates, etc...) will be stored. - # - # This should be different than any other store_dir_paths. #store_dir_path: REQUIRED #dns: diff --git a/src/config.rs b/src/config.rs index 5bbb1b9..5eaba2b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,6 @@ use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize)] pub struct Config { - pub origin: crate::origin::Config, pub domain: crate::domain::Config, pub service: crate::service::Config, } diff --git a/src/main.rs b/src/main.rs index 98823e8..a4067dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,10 +87,7 @@ async fn main() { let gemini_enabled = config.service.gemini.gemini_addr.is_some(); let external_domains_enabled = !config.domain.external_domains.is_empty(); - let origin_store = domani::origin::git::FSStore::new(&config.origin) - .expect("git origin store initialization failed"); - - //let origin_store = domani::origin::git_proxy::Proxy::new(); + let origin_store = domani::origin::git::Proxy::new(); let domain_checker = domani::domain::checker::DNSChecker::new( domani::token::MemStore::new(), diff --git a/src/origin.rs b/src/origin.rs index 6aa5b70..e0ec013 100644 --- a/src/origin.rs +++ b/src/origin.rs @@ -1,10 +1,7 @@ -mod config; pub mod descr; pub mod git; -pub mod git_proxy; pub mod mux; -pub use config::*; pub use descr::Descr; use crate::error::unexpected; diff --git a/src/origin/config.rs b/src/origin/config.rs deleted file mode 100644 index 45c52fa..0000000 --- a/src/origin/config.rs +++ /dev/null @@ -1,7 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::path; - -#[derive(Deserialize, Serialize)] -pub struct Config { - pub store_dir_path: path::PathBuf, -} diff --git a/src/origin/git.rs b/src/origin/git.rs index 746c66e..2f1d054 100644 --- a/src/origin/git.rs +++ b/src/origin/git.rs @@ -1,229 +1,216 @@ use crate::error::unexpected::{self, Intoable, Mappable}; use crate::{origin, util}; - -use std::path::{Path, PathBuf}; -use std::{collections, fs, future, io, sync}; - -use futures::stream; +use std::{collections, sync}; #[derive(Clone)] -struct RepoSnapshot { - repo: sync::Arc, - tree_object_id: gix::ObjectId, +struct DescrState { + current_tree: gix_hash::ObjectId, } -#[derive(thiserror::Error, Debug)] -enum CreateRepoSnapshotError { - #[error("invalid branch name")] - InvalidBranchName, +#[derive(thiserror::Error, Clone, Debug, PartialEq)] +enum GetObjectError { + #[error("unavailable due to server-side issue")] + Unavailable, #[error(transparent)] Unexpected(#[from] unexpected::Error), } -/// Implements the Store trait for Descr::Git, storing the git repos on disk. If any non-git Descrs -/// are used then this implementation will panic. -pub struct FSStore { - dir_path: PathBuf, +#[derive(Default)] +pub struct Proxy { + client: reqwest::Client, + state: sync::RwLock>, - // to prevent against syncing the same origin more than once at a time, but still allowing - // more than one origin to be syncing at a time + // to prevent syncing the same origin more than once at a time, but still allow hitting that + // origin in during a sync. sync_guard: sync::Mutex>, - - repo_snapshots: sync::RwLock>>, } -impl FSStore { - pub fn new(config: &origin::Config) -> io::Result { - let dir_path = config.store_dir_path.join("git"); - fs::create_dir_all(&dir_path)?; - Ok(Self { - dir_path, - sync_guard: sync::Mutex::new(collections::HashMap::new()), - repo_snapshots: sync::RwLock::new(collections::HashMap::new()), - }) - } - - fn repo_path(&self, descr: &origin::Descr) -> PathBuf { - self.dir_path.join(descr.id()) +impl Proxy { + pub fn new() -> Proxy { + Proxy::default() } - fn descr_file_path(&self, descr_id: &str) -> PathBuf { - self.dir_path.join(descr_id).join("descr.json") - } - - fn branch_ref(&self, branch_name: &str) -> String { - format!("origin/{branch_name}") - } - - fn deconstruct_descr(descr: &origin::Descr) -> (String, &str) { + fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) { let origin::Descr::Git { ref url, ref branch_name, } = descr; - (url.parsed.to_string(), branch_name) + (url, branch_name) } - fn create_repo_snapshot( - &self, - repo: gix::Repository, - descr: &origin::Descr, - ) -> Result { - let (_, branch_name) = Self::deconstruct_descr(descr); - let branch_ref = self.branch_ref(branch_name); - - let commit_object_id = repo - .try_find_reference(&branch_ref) - .map_unexpected_while(|| format!("finding branch ref {branch_ref}"))? - .ok_or(CreateRepoSnapshotError::InvalidBranchName)? - .peel_to_id_in_place() - .or_unexpected_while("peeling id in place")? - .detach(); - - let tree_object_id = repo - .find_object(commit_object_id) - .map_unexpected_while(|| format!("finding commit object {commit_object_id}"))? - .try_to_commit_ref() - .map_unexpected_while(|| format!("parsing {commit_object_id} as commit"))? - .tree(); - - Ok(RepoSnapshot { - repo: sync::Arc::new(repo.into()), - tree_object_id, - }) + fn construct_url( + url: &origin::descr::GitUrl, + sub_path: &str, + ) -> unexpected::Result { + let mut url: reqwest::Url = { + url.parsed + .to_string() + .parse() + .or_unexpected_while("parsing url as reqwest url")? + }; + + let new_path = url + .path() + .parse::() + .or_unexpected_while("parsing url path")? + .join(sub_path); + + url.set_path( + new_path + .to_str() + .or_unexpected_while("converting new path to string")?, + ); + + Ok(url) } - fn get_repo_snapshot( + async fn get_tip_commit_hash( &self, descr: &origin::Descr, - ) -> Result>, unexpected::Error> { - { - let repo_snapshots = self.repo_snapshots.read().unwrap(); - if let Some(repo_snapshot) = repo_snapshots.get(descr) { - return Ok(Some(repo_snapshot.clone())); + ) -> Result { + let (url, branch_name) = Self::deconstruct_descr(descr); + + let refs_url = + Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?; + + // when fetching refs we assume that any issue indicates that the origin itself + // (and therefore the URL) has some kind of issue. + let refs = self + .client + .get(refs_url) + .send() + .await + .or(Err(origin::SyncError::InvalidURL))? + .error_for_status() + .or(Err(origin::SyncError::InvalidURL))? + .text() + .await + .or(Err(origin::SyncError::InvalidURL))?; + + let full_ref = format!("refs/heads/{}", branch_name); + for line in refs.lines() { + if !line.ends_with(full_ref.as_str()) { + continue; } - } - let repo_path = self.repo_path(descr); - - match fs::read_dir(&repo_path) { - Ok(_) => (), - Err(e) => match e.kind() { - io::ErrorKind::NotFound => return Ok(None), - _ => { - return Err(e.into_unexpected_while(format!( - "checking if {} exists", - repo_path.display() - ))) - } - }, + return gix_hash::ObjectId::from_hex( + line.split_ascii_whitespace() + .next() + .ok_or(origin::SyncError::InvalidURL)? + .as_bytes(), + ) + .or(Err(origin::SyncError::InvalidURL)); } - let repo = gix::open(&repo_path) - .map_unexpected_while(|| format!("opening {} as git repo", repo_path.display()))?; - - let repo_snapshot = self - .create_repo_snapshot(repo, descr) - .map_err(|e| match e { - // it's not expected that the branch name is invalid at this point, it must have - // existed for sync to have been successful. - CreateRepoSnapshotError::InvalidBranchName => e.into_unexpected(), - CreateRepoSnapshotError::Unexpected(e) => e, - })?; + Err(origin::SyncError::InvalidBranchName) + } - let repo_snapshot = sync::Arc::new(repo_snapshot); + async fn get_object( + &self, + descr: &origin::Descr, + oid: &gix_hash::ObjectId, + expect_kind: gix_object::Kind, + ) -> Result { + let hex = oid.to_string(); + let (url, _) = Self::deconstruct_descr(descr); + + let object_url = + Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str()) + .or_unexpected_while("constructing refs url")?; + + let mut loose_object = self + .client + .get(object_url) + .send() + .await + .or(Err(GetObjectError::Unavailable))? + .error_for_status() + .map(|res| { + use async_compression::tokio::bufread::ZlibDecoder; + use futures::stream::TryStreamExt; + use std::io; + + tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ))) + }) + .or(Err(GetObjectError::Unavailable))?; + + use tokio::io::AsyncBufReadExt; + let mut header = Vec::::new(); + loose_object + .read_until(0, &mut header) + .await + .or(Err(GetObjectError::Unavailable))?; - let mut repo_snapshots = self.repo_snapshots.write().unwrap(); + let (kind, _, _) = + gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?; - (*repo_snapshots).insert(descr.clone(), repo_snapshot.clone()); + if kind != expect_kind { + return Err(GetObjectError::Unavailable); + } - Ok(Some(repo_snapshot)) + Ok(util::BoxByteStream::from_async_read(loose_object)) } - fn sync_inner(&self, descr: &origin::Descr) -> Result { - use gix::clone::Error as gixCloneErr; - use gix::progress::Discard; - - let should_interrupt = &core::sync::atomic::AtomicBool::new(false); - let repo_path = &self.repo_path(descr); - - // if the path doesn't exist then use the gix clone feature to clone it into the - // directory. - if fs::read_dir(repo_path).is_err() { - fs::create_dir_all(repo_path) - .map_unexpected_while(|| format!("creating {}", repo_path.display()))?; - - let (url, branch_name) = Self::deconstruct_descr(descr); + async fn get_commit_tree( + &self, + descr: &origin::Descr, + commit_hash: &gix_hash::ObjectId, + ) -> Result { + let commit_object_bytes = self + .get_object(descr, commit_hash, gix_object::Kind::Commit) + .await + .map_err(|e| match e { + GetObjectError::Unavailable => origin::SyncError::Unavailable, + GetObjectError::Unexpected(_) => e.into_unexpected().into(), + })? + .read_to_end() + .await + .or(Err(origin::SyncError::Unavailable))?; - let (repo, _) = gix::prepare_clone_bare(url.clone(), repo_path) - .map_err(|e| match e { - gixCloneErr::Init(gix::init::Error::InvalidBranchName { .. }) => { - origin::SyncError::InvalidBranchName - } - gixCloneErr::UrlParse(_) | gixCloneErr::CanonicalizeUrl { .. } => { - origin::SyncError::InvalidURL - } - _ => e - .into_unexpected_while(format!( - "cloning {} into {}", - url, - repo_path.display() - )) - .into(), - })? - .fetch_only(Discard, should_interrupt) - .map_err(|_| origin::SyncError::InvalidURL)?; - - // Check to make sure the branch name exists - // TODO if this fails we should delete repo_path - let branch_ref = self.branch_ref(branch_name); - repo.try_find_reference(&branch_ref) - .map_unexpected_while(|| format!("finding branch ref {branch_ref}"))? - .ok_or(origin::SyncError::InvalidBranchName)?; - - // Add the descr to the repo directory, so we can know the actual descr later - // TODO if this fails we should delete repo_path - let file_path = self.descr_file_path(descr.id().as_ref()); - let descr_file = fs::File::create(&file_path) - .map_unexpected_while(|| format!("creating {}", file_path.display()))?; - - serde_json::to_writer(descr_file, &descr) - .map_unexpected_while(|| format!("writing descr to {}", file_path.display()))?; - - return Ok(repo); - } + let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref()) + .or(Err(origin::SyncError::Unavailable))?; - let direction = gix::remote::Direction::Fetch; + Ok(commit_object.tree()) + } - let repo = gix::open(repo_path) - .map_unexpected_while(|| format!("opening repo at {}", repo_path.display()))?; + async fn get_tree_entry( + &self, + descr: &origin::Descr, + tree_hash: &gix_hash::ObjectId, + entry_name: &str, + ) -> Result { + let tree_object_bytes = self + .get_object(descr, tree_hash, gix_object::Kind::Tree) + .await + .map_err(|e| match e { + GetObjectError::Unavailable => origin::GetFileError::Unavailable, + GetObjectError::Unexpected(_) => e.into_unexpected().into(), + })? + .read_to_end() + .await + .or(Err(origin::GetFileError::Unavailable))?; - let remote = repo - .find_default_remote(direction) - .ok_or_else(|| unexpected::Error::from("no default configured"))? - .or_unexpected_while("finding default remote for fetching")?; + let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) + .or(Err(origin::GetFileError::Unavailable))?; - remote - .connect(direction) - .or_unexpected_while("connecting to remote")? - .prepare_fetch(Discard, Default::default()) - .or_unexpected_while("preparing fetch")? - .receive(Discard, should_interrupt) - .or_unexpected_while("fetching from remote")?; + for entry in tree_object.entries { + if entry.filename == entry_name { + return Ok(entry.into()); + } + } - Ok(repo) + Err(origin::GetFileError::FileNotFound) } } -impl super::Store for FSStore { - fn sync( - &self, - descr: &origin::Descr, - ) -> util::BoxFuture<'static, Result<(), origin::SyncError>> { - // TODO this implementation is kind of cheating, as it's doing everything synchronously but - // then returning the result in an async box. But the git store is going to be - // re-implemented soon anyway, so it doesn't matter. - let res = (|| { +impl origin::Store for Proxy { + fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { + let descr = descr.clone(); + Box::pin(async move { // attempt to lock this descr for syncing, doing so within a new scope so the mutex // isn't actually being held for the whole method duration. let is_already_syncing = { @@ -238,171 +225,110 @@ impl super::Store for FSStore { return Err(origin::SyncError::AlreadyInProgress); } - let res = self.sync_inner(&descr); + // perform the rest of the work within this closure, so we can be sure that the guard + // lock is released no matter what. + let res = async { + let commit_hash = self.get_tip_commit_hash(&descr).await?; + let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; + self.state + .write() + .unwrap() + .insert(descr.clone(), DescrState { current_tree }); + Ok(()) + } + .await; self.sync_guard.lock().unwrap().remove(&descr); - - let repo = match res { - Ok(repo) => repo, - Err(e) => return Err(e), - }; - - // repo is synced at this point (though the sync lock is still held), just gotta create - // the RepoSnapshot and store it. - // - // TODO this is a bit of a memory leak, but by the time we get - // to that point this should all be backed by something which isn't local storage - // anyway. - - // calling this while the sync lock is held isn't ideal, but it's convenient and - // shouldn't be too terrible generally - let repo_snapshot = self - .create_repo_snapshot(repo, &descr) - .map_err(|e| match e { - CreateRepoSnapshotError::InvalidBranchName => { - origin::SyncError::InvalidBranchName - } - CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e), - })?; - - let mut repo_snapshots = self.repo_snapshots.write().unwrap(); - (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); - - Ok(()) - })(); - - Box::pin(future::ready(res)) + res + }) } fn get_file( &self, descr: &origin::Descr, path: &str, - ) -> util::BoxFuture> { + ) -> util::BoxFuture<'_, Result> { let descr = descr.clone(); let path = path.to_string(); Box::pin(async move { - let repo_snapshot = match self.get_repo_snapshot(&descr) { - Ok(Some(repo_snapshot)) => repo_snapshot, - Ok(None) => return Err(origin::GetFileError::DescrNotSynced), - Err(e) => return Err(e.into()), - }; - - let mut clean_path = Path::new(path.as_str()); - clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); - - let repo = repo_snapshot.repo.to_thread_local(); - - let file_object = repo - .find_object(repo_snapshot.tree_object_id) - .map_unexpected_while(|| { - format!("finding tree object {}", repo_snapshot.tree_object_id) - })? - .peel_to_tree() - .map_unexpected_while(|| { - format!("peeling tree object {}", repo_snapshot.tree_object_id) - })? - .lookup_entry_by_path(clean_path) - .map_unexpected_while(|| { - format!( - "looking up {} in tree object {}", - clean_path.display(), - repo_snapshot.tree_object_id - ) - })? - .ok_or(origin::GetFileError::FileNotFound)? - .object() - .or_unexpected()?; - - use gix::object::Kind; - match file_object.kind { - Kind::Tree => Err(origin::GetFileError::PathIsDirectory), - Kind::Blob => { - // TODO this is very not ideal, the whole file is first read totally into memory, and then - // that is cloned. - let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice()); - - Ok(util::BoxByteStream::from_stream(stream::once(async move { - Ok(data) - }))) - } - Kind::Commit | Kind::Tag => Err(unexpected::Error::from( - format!("found object of kind {} in tree", file_object.kind).as_str(), - ) - .into()), + let current_state = self + .state + .read() + .unwrap() + .get(&descr) + .ok_or(origin::GetFileError::DescrNotSynced)? + .clone(); + + let path = path + .as_str() + .parse::() + .or_unexpected_while("parsing path")?; + + let path_parts = path.iter().collect::>(); + + let path_parts_len = path_parts.len(); + + if path_parts_len < 2 { + return Err(unexpected::Error::from("path has fewer than 2 parts").into()); + } else if path_parts[0] != std::path::MAIN_SEPARATOR_STR { + return Err(unexpected::Error::from(format!( + "expected first path part to be separator, found {:?}", + path_parts[0] + )) + .into()); } - }) - } -} -/* -#[cfg(test)] -mod tests { - use crate::origin::{self, Config, Store}; - use futures::StreamExt; - use tempdir::TempDir; - - #[tokio::test] - async fn basic() { - let tmp_dir = TempDir::new("origin_store_git").unwrap(); - let config = Config { - store_dir_path: tmp_dir.path().to_path_buf(), - }; - - let curr_dir = format!("file://{}", std::env::current_dir().unwrap().display()); - - let descr = origin::Descr::Git { - url: curr_dir.clone(), - branch_name: String::from("main"), - }; - - let other_descr = origin::Descr::Git { - url: curr_dir.clone(), - branch_name: String::from("some_other_branch"), - }; - - let store = super::FSStore::new(&config).expect("store created"); - - store.sync(&descr).await.expect("sync should succeed"); - store - .sync(&descr) - .await - .expect("second sync should succeed"); - - // RepoSnapshot doesn't exist - match store.get_file(&other_descr, "DNE") { - Err(origin::GetFileError::DescrNotSynced) => (), - _ => assert!(false, "descr should have not been found"), - }; - - let assert_file_dne = |path: &str| match store.get_file(&descr, path) { - Err(origin::GetFileError::FileNotFound) => (), - _ => assert!(false, "file should have not been found"), - }; - - let assert_file_not_empty = |path: &str| { - use bytes::BufMut; - - let mut f = store.get_file(&descr, path).expect("file not retrieved"); - let mut body = bytes::BytesMut::new(); + let mut tree_hash = current_state.current_tree; + + // The first part is "/" (main separator), and the last is the file name itself. + // Everything in between (if any) should be directories, so navigate those. + for dir_name in path_parts[1..path_parts_len - 1].iter() { + let entry = self + .get_tree_entry( + &descr, + &tree_hash, + dir_name + .to_str() + .map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?, + ) + .await?; - async move { - while let Some(chunk) = f.next().await { - body.put(chunk.unwrap()) + if !entry.mode.is_tree() { + return Err(origin::GetFileError::FileNotFound); } - assert!(body.len() > 0); + + tree_hash = entry.oid; } - }; - assert_file_not_empty("src/lib.rs").await; - assert_file_not_empty("/src/lib.rs").await; - assert_file_dne("DNE"); - assert_file_dne("src/../src/lib.rs"); + let file_name = { + let file_name = path_parts[path_parts_len - 1]; + file_name + .to_str() + .map_unexpected_while(|| format!("decoding file name {file_name:?}"))? + }; - let descrs = store.all_descrs().expect("all_descrs called"); + let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?; + + // TODO handle symlinks + if entry.mode.is_tree() { + return Err(origin::GetFileError::PathIsDirectory); + } else if !entry.mode.is_blob() { + return Err(unexpected::Error::from(format!( + "can't handle entry {} of mode {}", + entry.filename, + entry.mode.as_str() + )) + .into()); + } - assert_eq!(1, descrs.len()); - assert_eq!(descr, descrs[0]); + self.get_object(&descr, &entry.oid, gix_object::Kind::Blob) + .await + .map_err(|e| match e { + GetObjectError::Unavailable => origin::GetFileError::Unavailable, + GetObjectError::Unexpected(_) => e + .into_unexpected_while(format!("getting object for entry {:?}", entry)) + .into(), + }) + }) } } -*/ diff --git a/src/origin/git_proxy.rs b/src/origin/git_proxy.rs deleted file mode 100644 index 2f1d054..0000000 --- a/src/origin/git_proxy.rs +++ /dev/null @@ -1,334 +0,0 @@ -use crate::error::unexpected::{self, Intoable, Mappable}; -use crate::{origin, util}; -use std::{collections, sync}; - -#[derive(Clone)] -struct DescrState { - current_tree: gix_hash::ObjectId, -} - -#[derive(thiserror::Error, Clone, Debug, PartialEq)] -enum GetObjectError { - #[error("unavailable due to server-side issue")] - Unavailable, - - #[error(transparent)] - Unexpected(#[from] unexpected::Error), -} - -#[derive(Default)] -pub struct Proxy { - client: reqwest::Client, - state: sync::RwLock>, - - // to prevent syncing the same origin more than once at a time, but still allow hitting that - // origin in during a sync. - sync_guard: sync::Mutex>, -} - -impl Proxy { - pub fn new() -> Proxy { - Proxy::default() - } - - fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) { - let origin::Descr::Git { - ref url, - ref branch_name, - } = descr; - (url, branch_name) - } - - fn construct_url( - url: &origin::descr::GitUrl, - sub_path: &str, - ) -> unexpected::Result { - let mut url: reqwest::Url = { - url.parsed - .to_string() - .parse() - .or_unexpected_while("parsing url as reqwest url")? - }; - - let new_path = url - .path() - .parse::() - .or_unexpected_while("parsing url path")? - .join(sub_path); - - url.set_path( - new_path - .to_str() - .or_unexpected_while("converting new path to string")?, - ); - - Ok(url) - } - - async fn get_tip_commit_hash( - &self, - descr: &origin::Descr, - ) -> Result { - let (url, branch_name) = Self::deconstruct_descr(descr); - - let refs_url = - Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?; - - // when fetching refs we assume that any issue indicates that the origin itself - // (and therefore the URL) has some kind of issue. - let refs = self - .client - .get(refs_url) - .send() - .await - .or(Err(origin::SyncError::InvalidURL))? - .error_for_status() - .or(Err(origin::SyncError::InvalidURL))? - .text() - .await - .or(Err(origin::SyncError::InvalidURL))?; - - let full_ref = format!("refs/heads/{}", branch_name); - for line in refs.lines() { - if !line.ends_with(full_ref.as_str()) { - continue; - } - - return gix_hash::ObjectId::from_hex( - line.split_ascii_whitespace() - .next() - .ok_or(origin::SyncError::InvalidURL)? - .as_bytes(), - ) - .or(Err(origin::SyncError::InvalidURL)); - } - - Err(origin::SyncError::InvalidBranchName) - } - - async fn get_object( - &self, - descr: &origin::Descr, - oid: &gix_hash::ObjectId, - expect_kind: gix_object::Kind, - ) -> Result { - let hex = oid.to_string(); - let (url, _) = Self::deconstruct_descr(descr); - - let object_url = - Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str()) - .or_unexpected_while("constructing refs url")?; - - let mut loose_object = self - .client - .get(object_url) - .send() - .await - .or(Err(GetObjectError::Unavailable))? - .error_for_status() - .map(|res| { - use async_compression::tokio::bufread::ZlibDecoder; - use futures::stream::TryStreamExt; - use std::io; - - tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new( - res.bytes_stream() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), - ))) - }) - .or(Err(GetObjectError::Unavailable))?; - - use tokio::io::AsyncBufReadExt; - let mut header = Vec::::new(); - loose_object - .read_until(0, &mut header) - .await - .or(Err(GetObjectError::Unavailable))?; - - let (kind, _, _) = - gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?; - - if kind != expect_kind { - return Err(GetObjectError::Unavailable); - } - - Ok(util::BoxByteStream::from_async_read(loose_object)) - } - - async fn get_commit_tree( - &self, - descr: &origin::Descr, - commit_hash: &gix_hash::ObjectId, - ) -> Result { - let commit_object_bytes = self - .get_object(descr, commit_hash, gix_object::Kind::Commit) - .await - .map_err(|e| match e { - GetObjectError::Unavailable => origin::SyncError::Unavailable, - GetObjectError::Unexpected(_) => e.into_unexpected().into(), - })? - .read_to_end() - .await - .or(Err(origin::SyncError::Unavailable))?; - - let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref()) - .or(Err(origin::SyncError::Unavailable))?; - - Ok(commit_object.tree()) - } - - async fn get_tree_entry( - &self, - descr: &origin::Descr, - tree_hash: &gix_hash::ObjectId, - entry_name: &str, - ) -> Result { - let tree_object_bytes = self - .get_object(descr, tree_hash, gix_object::Kind::Tree) - .await - .map_err(|e| match e { - GetObjectError::Unavailable => origin::GetFileError::Unavailable, - GetObjectError::Unexpected(_) => e.into_unexpected().into(), - })? - .read_to_end() - .await - .or(Err(origin::GetFileError::Unavailable))?; - - let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) - .or(Err(origin::GetFileError::Unavailable))?; - - for entry in tree_object.entries { - if entry.filename == entry_name { - return Ok(entry.into()); - } - } - - Err(origin::GetFileError::FileNotFound) - } -} - -impl origin::Store for Proxy { - fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { - let descr = descr.clone(); - Box::pin(async move { - // attempt to lock this descr for syncing, doing so within a new scope so the mutex - // isn't actually being held for the whole method duration. - let is_already_syncing = { - self.sync_guard - .lock() - .unwrap() - .insert(descr.clone(), ()) - .is_some() - }; - - if is_already_syncing { - return Err(origin::SyncError::AlreadyInProgress); - } - - // perform the rest of the work within this closure, so we can be sure that the guard - // lock is released no matter what. - let res = async { - let commit_hash = self.get_tip_commit_hash(&descr).await?; - let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; - self.state - .write() - .unwrap() - .insert(descr.clone(), DescrState { current_tree }); - Ok(()) - } - .await; - - self.sync_guard.lock().unwrap().remove(&descr); - res - }) - } - - fn get_file( - &self, - descr: &origin::Descr, - path: &str, - ) -> util::BoxFuture<'_, Result> { - let descr = descr.clone(); - let path = path.to_string(); - Box::pin(async move { - let current_state = self - .state - .read() - .unwrap() - .get(&descr) - .ok_or(origin::GetFileError::DescrNotSynced)? - .clone(); - - let path = path - .as_str() - .parse::() - .or_unexpected_while("parsing path")?; - - let path_parts = path.iter().collect::>(); - - let path_parts_len = path_parts.len(); - - if path_parts_len < 2 { - return Err(unexpected::Error::from("path has fewer than 2 parts").into()); - } else if path_parts[0] != std::path::MAIN_SEPARATOR_STR { - return Err(unexpected::Error::from(format!( - "expected first path part to be separator, found {:?}", - path_parts[0] - )) - .into()); - } - - let mut tree_hash = current_state.current_tree; - - // The first part is "/" (main separator), and the last is the file name itself. - // Everything in between (if any) should be directories, so navigate those. - for dir_name in path_parts[1..path_parts_len - 1].iter() { - let entry = self - .get_tree_entry( - &descr, - &tree_hash, - dir_name - .to_str() - .map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?, - ) - .await?; - - if !entry.mode.is_tree() { - return Err(origin::GetFileError::FileNotFound); - } - - tree_hash = entry.oid; - } - - let file_name = { - let file_name = path_parts[path_parts_len - 1]; - file_name - .to_str() - .map_unexpected_while(|| format!("decoding file name {file_name:?}"))? - }; - - let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?; - - // TODO handle symlinks - if entry.mode.is_tree() { - return Err(origin::GetFileError::PathIsDirectory); - } else if !entry.mode.is_blob() { - return Err(unexpected::Error::from(format!( - "can't handle entry {} of mode {}", - entry.filename, - entry.mode.as_str() - )) - .into()); - } - - self.get_object(&descr, &entry.oid, gix_object::Kind::Blob) - .await - .map_err(|e| match e { - GetObjectError::Unavailable => origin::GetFileError::Unavailable, - GetObjectError::Unexpected(_) => e - .into_unexpected_while(format!("getting object for entry {:?}", entry)) - .into(), - }) - }) - } -}