domani/src/origin/git.rs

357 lines
12 KiB
Rust

use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util};
use std::{collections, sync};
#[derive(Clone)]
struct DescrState {
current_tree: gix_hash::ObjectId,
tree_cache: collections::HashMap<gix_hash::ObjectId, gix_object::Tree>,
}
impl DescrState {
fn new(current_tree: gix_hash::ObjectId) -> DescrState {
DescrState {
current_tree,
tree_cache: Default::default(),
}
}
}
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
enum GetObjectError {
#[error("unavailable due to server-side issue")]
Unavailable,
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
}
#[derive(Default)]
pub struct Proxy {
client: reqwest::Client,
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
}
impl Proxy {
pub fn new() -> Proxy {
Proxy::default()
}
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
}
fn construct_url(
url: &origin::descr::GitUrl,
sub_path: &str,
) -> unexpected::Result<reqwest::Url> {
let mut url: reqwest::Url = {
url.parsed
.to_string()
.parse()
.or_unexpected_while("parsing url as reqwest url")?
};
let new_path = url
.path()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing url path")?
.join(sub_path);
url.set_path(
new_path
.to_str()
.or_unexpected_while("converting new path to string")?,
);
Ok(url)
}
async fn get_tip_commit_hash(
&self,
descr: &origin::Descr,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let (url, branch_name) = Self::deconstruct_descr(descr);
let refs_url =
Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?;
// when fetching refs we assume that any issue indicates that the origin itself
// (and therefore the URL) has some kind of issue.
let refs = self
.client
.get(refs_url)
.send()
.await
.or(Err(origin::SyncError::InvalidURL))?
.error_for_status()
.or(Err(origin::SyncError::InvalidURL))?
.text()
.await
.or(Err(origin::SyncError::InvalidURL))?;
let full_ref = format!("refs/heads/{}", branch_name);
for line in refs.lines() {
if !line.ends_with(full_ref.as_str()) {
continue;
}
return gix_hash::ObjectId::from_hex(
line.split_ascii_whitespace()
.next()
.ok_or(origin::SyncError::InvalidURL)?
.as_bytes(),
)
.or(Err(origin::SyncError::InvalidURL));
}
Err(origin::SyncError::InvalidBranchName)
}
async fn get_object(
&self,
descr: &origin::Descr,
oid: &gix_hash::ObjectId,
expect_kind: gix_object::Kind,
) -> Result<util::BoxByteStream, GetObjectError> {
let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr);
let object_url =
Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str())
.or_unexpected_while("constructing refs url")?;
let mut loose_object = self
.client
.get(object_url)
.send()
.await
.or(Err(GetObjectError::Unavailable))?
.error_for_status()
.map(|res| {
use async_compression::tokio::bufread::ZlibDecoder;
use futures::stream::TryStreamExt;
use std::io;
tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new(
res.bytes_stream()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
)))
})
.or(Err(GetObjectError::Unavailable))?;
use tokio::io::AsyncBufReadExt;
let mut header = Vec::<u8>::new();
loose_object
.read_until(0, &mut header)
.await
.or(Err(GetObjectError::Unavailable))?;
let (kind, _, _) =
gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?;
if kind != expect_kind {
return Err(GetObjectError::Unavailable);
}
Ok(util::BoxByteStream::from_async_read(loose_object))
}
async fn get_commit_tree(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self
.get_object(descr, commit_hash, gix_object::Kind::Commit)
.await
.map_err(|e| match e {
GetObjectError::Unavailable => origin::SyncError::Unavailable,
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
})?
.read_to_end()
.await
.or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::Unavailable))?;
Ok(commit_object.tree())
}
async fn get_tree_entry(
&self,
descr: &origin::Descr,
tree_hash: &gix_hash::ObjectId,
entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let cache_entry = {
let states = self.state.read().unwrap();
states
.get(descr)
.expect("get_tree_entry called with unknown descr")
.tree_cache
.get(tree_hash)
.cloned()
};
let tree_object = match cache_entry {
Some(tree_object) => tree_object,
None => {
let tree_object_bytes = self
.get_object(descr, tree_hash, gix_object::Kind::Tree)
.await
.map_err(|e| match e {
GetObjectError::Unavailable => origin::GetFileError::Unavailable,
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
})?
.read_to_end()
.await
.or(Err(origin::GetFileError::Unavailable))?;
let tree_object: gix_object::Tree =
gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))?
.into();
self.state
.write()
.unwrap()
.get_mut(descr)
.expect("get_tree_entry called with unknown descr")
.tree_cache
.insert(*tree_hash, tree_object.clone());
tree_object
}
};
for entry in tree_object.entries.iter() {
if entry.filename == entry_name {
return Ok(entry.clone());
}
}
Err(origin::GetFileError::FileNotFound)
}
}
impl origin::Store for Proxy {
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
let commit_hash = self.get_tip_commit_hash(&descr).await?;
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
use std::collections::hash_map::Entry;
match self.state.write().unwrap().entry(descr) {
Entry::Vacant(entry) => {
entry.insert(DescrState::new(current_tree));
}
Entry::Occupied(mut entry) => {
// if this is not the first sync then we don't want to overwrite the entry
// unless the tree has actually changed, in order to preserve the cache.
if entry.get().current_tree != current_tree {
entry.insert(DescrState::new(current_tree));
}
}
}
Ok(())
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let current_state = self
.state
.read()
.unwrap()
.get(&descr)
.ok_or(origin::GetFileError::DescrNotSynced)?
.clone();
let path = path
.as_str()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
let path_parts_len = path_parts.len();
if path_parts_len < 2 {
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
return Err(unexpected::Error::from(format!(
"expected first path part to be separator, found {:?}",
path_parts[0]
))
.into());
}
let mut tree_hash = current_state.current_tree;
// The first part is "/" (main separator), and the last is the file name itself.
// Everything in between (if any) should be directories, so navigate those.
for dir_name in path_parts[1..path_parts_len - 1].iter() {
let entry = self
.get_tree_entry(
&descr,
&tree_hash,
dir_name
.to_str()
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
)
.await?;
if !entry.mode.is_tree() {
return Err(origin::GetFileError::FileNotFound);
}
tree_hash = entry.oid;
}
let file_name = {
let file_name = path_parts[path_parts_len - 1];
file_name
.to_str()
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
};
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
// TODO handle symlinks
if entry.mode.is_tree() {
return Err(origin::GetFileError::PathIsDirectory);
} else if !entry.mode.is_blob() {
return Err(unexpected::Error::from(format!(
"can't handle entry {} of mode {}",
entry.filename,
entry.mode.as_str()
))
.into());
}
self.get_object(&descr, &entry.oid, gix_object::Kind::Blob)
.await
.map_err(|e| match e {
GetObjectError::Unavailable => origin::GetFileError::Unavailable,
GetObjectError::Unexpected(_) => e
.into_unexpected_while(format!("getting object for entry {:?}", entry))
.into(),
})
})
}
}