parent
1e8f970486
commit
498a33533f
@ -1,7 +0,0 @@ |
|||||||
use serde::{Deserialize, Serialize}; |
|
||||||
use std::path; |
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)] |
|
||||||
pub struct Config { |
|
||||||
pub store_dir_path: path::PathBuf, |
|
||||||
} |
|
@ -1,334 +0,0 @@ |
|||||||
use crate::error::unexpected::{self, Intoable, Mappable}; |
|
||||||
use crate::{origin, util}; |
|
||||||
use std::{collections, sync}; |
|
||||||
|
|
||||||
#[derive(Clone)] |
|
||||||
struct DescrState { |
|
||||||
current_tree: gix_hash::ObjectId, |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)] |
|
||||||
enum GetObjectError { |
|
||||||
#[error("unavailable due to server-side issue")] |
|
||||||
Unavailable, |
|
||||||
|
|
||||||
#[error(transparent)] |
|
||||||
Unexpected(#[from] unexpected::Error), |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Default)] |
|
||||||
pub struct Proxy { |
|
||||||
client: reqwest::Client, |
|
||||||
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>, |
|
||||||
|
|
||||||
// to prevent syncing the same origin more than once at a time, but still allow hitting that
|
|
||||||
// origin in during a sync.
|
|
||||||
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>, |
|
||||||
} |
|
||||||
|
|
||||||
impl Proxy { |
|
||||||
pub fn new() -> Proxy { |
|
||||||
Proxy::default() |
|
||||||
} |
|
||||||
|
|
||||||
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) { |
|
||||||
let origin::Descr::Git { |
|
||||||
ref url, |
|
||||||
ref branch_name, |
|
||||||
} = descr; |
|
||||||
(url, branch_name) |
|
||||||
} |
|
||||||
|
|
||||||
fn construct_url( |
|
||||||
url: &origin::descr::GitUrl, |
|
||||||
sub_path: &str, |
|
||||||
) -> unexpected::Result<reqwest::Url> { |
|
||||||
let mut url: reqwest::Url = { |
|
||||||
url.parsed |
|
||||||
.to_string() |
|
||||||
.parse() |
|
||||||
.or_unexpected_while("parsing url as reqwest url")? |
|
||||||
}; |
|
||||||
|
|
||||||
let new_path = url |
|
||||||
.path() |
|
||||||
.parse::<std::path::PathBuf>() |
|
||||||
.or_unexpected_while("parsing url path")? |
|
||||||
.join(sub_path); |
|
||||||
|
|
||||||
url.set_path( |
|
||||||
new_path |
|
||||||
.to_str() |
|
||||||
.or_unexpected_while("converting new path to string")?, |
|
||||||
); |
|
||||||
|
|
||||||
Ok(url) |
|
||||||
} |
|
||||||
|
|
||||||
async fn get_tip_commit_hash( |
|
||||||
&self, |
|
||||||
descr: &origin::Descr, |
|
||||||
) -> Result<gix_hash::ObjectId, origin::SyncError> { |
|
||||||
let (url, branch_name) = Self::deconstruct_descr(descr); |
|
||||||
|
|
||||||
let refs_url = |
|
||||||
Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?; |
|
||||||
|
|
||||||
// when fetching refs we assume that any issue indicates that the origin itself
|
|
||||||
// (and therefore the URL) has some kind of issue.
|
|
||||||
let refs = self |
|
||||||
.client |
|
||||||
.get(refs_url) |
|
||||||
.send() |
|
||||||
.await |
|
||||||
.or(Err(origin::SyncError::InvalidURL))? |
|
||||||
.error_for_status() |
|
||||||
.or(Err(origin::SyncError::InvalidURL))? |
|
||||||
.text() |
|
||||||
.await |
|
||||||
.or(Err(origin::SyncError::InvalidURL))?; |
|
||||||
|
|
||||||
let full_ref = format!("refs/heads/{}", branch_name); |
|
||||||
for line in refs.lines() { |
|
||||||
if !line.ends_with(full_ref.as_str()) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
|
|
||||||
return gix_hash::ObjectId::from_hex( |
|
||||||
line.split_ascii_whitespace() |
|
||||||
.next() |
|
||||||
.ok_or(origin::SyncError::InvalidURL)? |
|
||||||
.as_bytes(), |
|
||||||
) |
|
||||||
.or(Err(origin::SyncError::InvalidURL)); |
|
||||||
} |
|
||||||
|
|
||||||
Err(origin::SyncError::InvalidBranchName) |
|
||||||
} |
|
||||||
|
|
||||||
async fn get_object( |
|
||||||
&self, |
|
||||||
descr: &origin::Descr, |
|
||||||
oid: &gix_hash::ObjectId, |
|
||||||
expect_kind: gix_object::Kind, |
|
||||||
) -> Result<util::BoxByteStream, GetObjectError> { |
|
||||||
let hex = oid.to_string(); |
|
||||||
let (url, _) = Self::deconstruct_descr(descr); |
|
||||||
|
|
||||||
let object_url = |
|
||||||
Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str()) |
|
||||||
.or_unexpected_while("constructing refs url")?; |
|
||||||
|
|
||||||
let mut loose_object = self |
|
||||||
.client |
|
||||||
.get(object_url) |
|
||||||
.send() |
|
||||||
.await |
|
||||||
.or(Err(GetObjectError::Unavailable))? |
|
||||||
.error_for_status() |
|
||||||
.map(|res| { |
|
||||||
use async_compression::tokio::bufread::ZlibDecoder; |
|
||||||
use futures::stream::TryStreamExt; |
|
||||||
use std::io; |
|
||||||
|
|
||||||
tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new( |
|
||||||
res.bytes_stream() |
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)), |
|
||||||
))) |
|
||||||
}) |
|
||||||
.or(Err(GetObjectError::Unavailable))?; |
|
||||||
|
|
||||||
use tokio::io::AsyncBufReadExt; |
|
||||||
let mut header = Vec::<u8>::new(); |
|
||||||
loose_object |
|
||||||
.read_until(0, &mut header) |
|
||||||
.await |
|
||||||
.or(Err(GetObjectError::Unavailable))?; |
|
||||||
|
|
||||||
let (kind, _, _) = |
|
||||||
gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?; |
|
||||||
|
|
||||||
if kind != expect_kind { |
|
||||||
return Err(GetObjectError::Unavailable); |
|
||||||
} |
|
||||||
|
|
||||||
Ok(util::BoxByteStream::from_async_read(loose_object)) |
|
||||||
} |
|
||||||
|
|
||||||
async fn get_commit_tree( |
|
||||||
&self, |
|
||||||
descr: &origin::Descr, |
|
||||||
commit_hash: &gix_hash::ObjectId, |
|
||||||
) -> Result<gix_hash::ObjectId, origin::SyncError> { |
|
||||||
let commit_object_bytes = self |
|
||||||
.get_object(descr, commit_hash, gix_object::Kind::Commit) |
|
||||||
.await |
|
||||||
.map_err(|e| match e { |
|
||||||
GetObjectError::Unavailable => origin::SyncError::Unavailable, |
|
||||||
GetObjectError::Unexpected(_) => e.into_unexpected().into(), |
|
||||||
})? |
|
||||||
.read_to_end() |
|
||||||
.await |
|
||||||
.or(Err(origin::SyncError::Unavailable))?; |
|
||||||
|
|
||||||
let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref()) |
|
||||||
.or(Err(origin::SyncError::Unavailable))?; |
|
||||||
|
|
||||||
Ok(commit_object.tree()) |
|
||||||
} |
|
||||||
|
|
||||||
async fn get_tree_entry( |
|
||||||
&self, |
|
||||||
descr: &origin::Descr, |
|
||||||
tree_hash: &gix_hash::ObjectId, |
|
||||||
entry_name: &str, |
|
||||||
) -> Result<gix_object::tree::Entry, origin::GetFileError> { |
|
||||||
let tree_object_bytes = self |
|
||||||
.get_object(descr, tree_hash, gix_object::Kind::Tree) |
|
||||||
.await |
|
||||||
.map_err(|e| match e { |
|
||||||
GetObjectError::Unavailable => origin::GetFileError::Unavailable, |
|
||||||
GetObjectError::Unexpected(_) => e.into_unexpected().into(), |
|
||||||
})? |
|
||||||
.read_to_end() |
|
||||||
.await |
|
||||||
.or(Err(origin::GetFileError::Unavailable))?; |
|
||||||
|
|
||||||
let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref()) |
|
||||||
.or(Err(origin::GetFileError::Unavailable))?; |
|
||||||
|
|
||||||
for entry in tree_object.entries { |
|
||||||
if entry.filename == entry_name { |
|
||||||
return Ok(entry.into()); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
Err(origin::GetFileError::FileNotFound) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl origin::Store for Proxy { |
|
||||||
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { |
|
||||||
let descr = descr.clone(); |
|
||||||
Box::pin(async move { |
|
||||||
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
|
||||||
// isn't actually being held for the whole method duration.
|
|
||||||
let is_already_syncing = { |
|
||||||
self.sync_guard |
|
||||||
.lock() |
|
||||||
.unwrap() |
|
||||||
.insert(descr.clone(), ()) |
|
||||||
.is_some() |
|
||||||
}; |
|
||||||
|
|
||||||
if is_already_syncing { |
|
||||||
return Err(origin::SyncError::AlreadyInProgress); |
|
||||||
} |
|
||||||
|
|
||||||
// perform the rest of the work within this closure, so we can be sure that the guard
|
|
||||||
// lock is released no matter what.
|
|
||||||
let res = async { |
|
||||||
let commit_hash = self.get_tip_commit_hash(&descr).await?; |
|
||||||
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?; |
|
||||||
self.state |
|
||||||
.write() |
|
||||||
.unwrap() |
|
||||||
.insert(descr.clone(), DescrState { current_tree }); |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
.await; |
|
||||||
|
|
||||||
self.sync_guard.lock().unwrap().remove(&descr); |
|
||||||
res |
|
||||||
}) |
|
||||||
} |
|
||||||
|
|
||||||
fn get_file( |
|
||||||
&self, |
|
||||||
descr: &origin::Descr, |
|
||||||
path: &str, |
|
||||||
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> { |
|
||||||
let descr = descr.clone(); |
|
||||||
let path = path.to_string(); |
|
||||||
Box::pin(async move { |
|
||||||
let current_state = self |
|
||||||
.state |
|
||||||
.read() |
|
||||||
.unwrap() |
|
||||||
.get(&descr) |
|
||||||
.ok_or(origin::GetFileError::DescrNotSynced)? |
|
||||||
.clone(); |
|
||||||
|
|
||||||
let path = path |
|
||||||
.as_str() |
|
||||||
.parse::<std::path::PathBuf>() |
|
||||||
.or_unexpected_while("parsing path")?; |
|
||||||
|
|
||||||
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>(); |
|
||||||
|
|
||||||
let path_parts_len = path_parts.len(); |
|
||||||
|
|
||||||
if path_parts_len < 2 { |
|
||||||
return Err(unexpected::Error::from("path has fewer than 2 parts").into()); |
|
||||||
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR { |
|
||||||
return Err(unexpected::Error::from(format!( |
|
||||||
"expected first path part to be separator, found {:?}", |
|
||||||
path_parts[0] |
|
||||||
)) |
|
||||||
.into()); |
|
||||||
} |
|
||||||
|
|
||||||
let mut tree_hash = current_state.current_tree; |
|
||||||
|
|
||||||
// The first part is "/" (main separator), and the last is the file name itself.
|
|
||||||
// Everything in between (if any) should be directories, so navigate those.
|
|
||||||
for dir_name in path_parts[1..path_parts_len - 1].iter() { |
|
||||||
let entry = self |
|
||||||
.get_tree_entry( |
|
||||||
&descr, |
|
||||||
&tree_hash, |
|
||||||
dir_name |
|
||||||
.to_str() |
|
||||||
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?, |
|
||||||
) |
|
||||||
.await?; |
|
||||||
|
|
||||||
if !entry.mode.is_tree() { |
|
||||||
return Err(origin::GetFileError::FileNotFound); |
|
||||||
} |
|
||||||
|
|
||||||
tree_hash = entry.oid; |
|
||||||
} |
|
||||||
|
|
||||||
let file_name = { |
|
||||||
let file_name = path_parts[path_parts_len - 1]; |
|
||||||
file_name |
|
||||||
.to_str() |
|
||||||
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))? |
|
||||||
}; |
|
||||||
|
|
||||||
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?; |
|
||||||
|
|
||||||
// TODO handle symlinks
|
|
||||||
if entry.mode.is_tree() { |
|
||||||
return Err(origin::GetFileError::PathIsDirectory); |
|
||||||
} else if !entry.mode.is_blob() { |
|
||||||
return Err(unexpected::Error::from(format!( |
|
||||||
"can't handle entry {} of mode {}", |
|
||||||
entry.filename, |
|
||||||
entry.mode.as_str() |
|
||||||
)) |
|
||||||
.into()); |
|
||||||
} |
|
||||||
|
|
||||||
self.get_object(&descr, &entry.oid, gix_object::Kind::Blob) |
|
||||||
.await |
|
||||||
.map_err(|e| match e { |
|
||||||
GetObjectError::Unavailable => origin::GetFileError::Unavailable, |
|
||||||
GetObjectError::Unexpected(_) => e |
|
||||||
.into_unexpected_while(format!("getting object for entry {:?}", entry)) |
|
||||||
.into(), |
|
||||||
}) |
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
Loading…
Reference in new issue