Implement most of new git implementation's get_file method

This includes a refactoring of get_file to be completely async, as well
as to add a new error case.
main
Brian Picciano 3 months ago
parent 142fc14916
commit 2302e9ff64
  1. 14
      Cargo.lock
  2. 2
      Cargo.toml
  3. 8
      src/domain/manager.rs
  4. 2
      src/main.rs
  5. 13
      src/origin.rs
  6. 95
      src/origin/git.rs
  7. 160
      src/origin/git_proxy.rs
  8. 13
      src/origin/mux.rs
  9. 7
      src/service/gemini.rs
  10. 8
      src/service/http.rs
  11. 7
      src/util.rs

14
Cargo.lock generated

@ -2731,6 +2731,7 @@ dependencies = [
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.25.3",
"winreg 0.50.0",
@ -3713,6 +3714,19 @@ version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "wasm-streams"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.61"

@ -47,7 +47,7 @@ gemini = "0.0.5"
bytes = "1.4.0"
hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate"] }
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0"
[patch.crates-io]

@ -40,6 +40,9 @@ pub enum SyncWithSettingsError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -60,6 +63,7 @@ impl From<origin::SyncError> for SyncWithSettingsError {
fn from(e: origin::SyncError) -> SyncWithSettingsError {
match e {
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
origin::SyncError::Unavailable => SyncWithSettingsError::Unavailable,
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
@ -96,7 +100,7 @@ pub trait Manager: Sync + Send {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError>;
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
fn sync_with_settings(
&self,
@ -397,7 +401,7 @@ impl Manager for ManagerImpl {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError> {
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>> {
let path = settings.process_path(path);
self.origin_store
.get_file(&settings.origin_descr, path.as_ref())

@ -90,6 +90,8 @@ async fn main() {
let origin_store = domani::origin::git::FSStore::new(&config.origin)
.expect("git origin store initialization failed");
//let origin_store = domani::origin::git_proxy::Proxy::new();
let domain_checker = domani::domain::checker::DNSChecker::new(
domani::token::MemStore::new(),
&config.domain.dns,

@ -15,6 +15,9 @@ pub enum SyncError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -33,6 +36,9 @@ pub enum GetFileError {
#[error("file not found")]
FileNotFound,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("path is directory")]
PathIsDirectory,
@ -46,5 +52,10 @@ pub trait Store {
/// the origin into the storage.
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>;
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
/// Returns the body of the descr's given path, where path must be absolute.
fn get_file(
&self,
descr: &Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
}

@ -278,53 +278,60 @@ impl super::Store for FSStore {
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
let repo_snapshot = match self.get_repo_snapshot(descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let repo_snapshot = match self.get_repo_snapshot(&descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(path.as_str());
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let mut clean_path = Path::new(path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(util::into_box_byte_stream(stream::once(
async move { Ok(data) },
)))
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(Box::pin(stream::once(async move { Ok(data) })))
.into()),
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
.into()),
}
})
}
}

@ -2,6 +2,7 @@ use crate::error::unexpected::{self, Mappable};
use crate::{origin, util};
use std::{collections, sync};
#[derive(Clone)]
struct DescrState {
current_tree: gix_hash::ObjectId,
}
@ -97,39 +98,78 @@ impl Proxy {
Err(origin::SyncError::InvalidBranchName)
}
async fn get_commit_tree(
async fn get_object(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let hex = commit_hash.to_string();
oid: &gix_hash::ObjectId,
) -> unexpected::Result<Option<reqwest::Response>> {
let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr);
let commit_object_url = Self::construct_url(
let object_url = Self::construct_url(
url,
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
)
.or_unexpected_while("constructing refs url")?;
let commit_object_bytes = self
Ok(self
.client
.get(commit_object_url)
.get(object_url)
.send()
.await
.or(Err(origin::SyncError::InvalidURL))?
.or_unexpected_while("performing request")?
.error_for_status()
.or(Err(origin::SyncError::InvalidURL))?
.ok())
}
async fn get_commit_tree(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self
.get_object(descr, commit_hash)
.await?
.ok_or(origin::SyncError::Unavailable)?
.bytes()
.await
.or(Err(origin::SyncError::InvalidURL))?;
.or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::InvalidURL))?
.or(Err(origin::SyncError::Unavailable))?
.into_commit()
.ok_or(origin::SyncError::InvalidURL)?;
.ok_or(origin::SyncError::Unavailable)?;
Ok(commit_object.tree())
}
async fn get_tree_entry(
&self,
descr: &origin::Descr,
tree_hash: &gix_hash::ObjectId,
entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let tree_object_bytes = self
.get_object(descr, tree_hash)
.await?
.ok_or(origin::GetFileError::Unavailable)?
.bytes()
.await
.or(Err(origin::GetFileError::Unavailable))?;
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))?
.into_tree()
.ok_or(origin::GetFileError::Unavailable)?;
for entry in tree_object.entries {
if entry.filename == entry_name {
return Ok(entry.into());
}
}
Err(origin::GetFileError::FileNotFound)
}
}
impl origin::Store for Proxy {
@ -170,9 +210,97 @@ impl origin::Store for Proxy {
fn get_file(
&self,
_descr: &origin::Descr,
_path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
panic!("TODO")
descr: &origin::Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let current_state = self
.state
.read()
.unwrap()
.get(&descr)
.ok_or(origin::GetFileError::DescrNotSynced)?
.clone();
let path = path
.as_str()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")?
.canonicalize()
.or_unexpected_while("canonicalizing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
let path_parts_len = path_parts.len();
if path_parts_len < 2 {
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
return Err(unexpected::Error::from(format!(
"expected first path part to be separator, found {:?}",
path_parts[0]
))
.into());
}
let mut tree_hash = current_state.current_tree;
// The first part is "/" (main separator), and the last is the file name itself.
// Everything in between (if any) should be directories, so navigate those.
for dir_name in path_parts[1..path_parts_len - 1].iter() {
let entry = self
.get_tree_entry(
&descr,
&tree_hash,
dir_name
.to_str()
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
)
.await?;
if !entry.mode.is_tree() {
return Err(origin::GetFileError::FileNotFound);
}
tree_hash = entry.oid;
}
let file_name = {
let file_name = path_parts[path_parts_len - 1];
file_name
.to_str()
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
};
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
// TODO handle symlinks
if entry.mode.is_tree() {
return Err(origin::GetFileError::PathIsDirectory);
} else if !entry.mode.is_blob() {
return Err(unexpected::Error::from(format!(
"can't handle entry {} of mode {}",
entry.filename,
entry.mode.as_str()
))
.into());
}
let res = self
.get_object(&descr, &entry.oid)
.await?
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
.bytes_stream();
use futures::StreamExt;
Ok(util::into_box_byte_stream(res.map(|r| {
use std::io::{Error, ErrorKind};
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
})))
// TODO this is still not correct, as it will include the git object header
})
}
}

@ -37,9 +37,14 @@ where
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(descr, path)
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(&descr, &path)
.await
})
}
}

@ -98,11 +98,16 @@ impl Service {
let path = service::append_index_to_path(req.path(), "index.gmi");
use domain::manager::GetFileError;
let f = match self.domain_manager.get_file(settings, &path) {
let f = match self.domain_manager.get_file(settings, &path).await {
Ok(f) => f,
Err(GetFileError::FileNotFound) => {
return Ok(self.respond_conn(w, "51", "File not found", None).await?)
}
Err(GetFileError::Unavailable) => {
return Ok(self
.respond_conn(w, "43", "Content unavailable", None)
.await?)
}
Err(GetFileError::DescrNotSynced) => {
return Err(unexpected::Error::from(
format!(

@ -218,9 +218,10 @@ impl Service {
let path = service::append_index_to_path(req.uri().path(), "index.html");
use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path) {
match self.domain_manager.get_file(&settings, &path).await {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error(
format!(
"Backend for {:?} has not yet been synced",
@ -408,6 +409,11 @@ impl Service {
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::Unavailable) => (Some(
"Fetching the git repository failed; the server is not available or is not corectly serving the repository."
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
"The git repository does not have a branch of the given name; please double check
that you input the correct name."

@ -33,4 +33,11 @@ pub fn parse_file<T: std::str::FromStr>(
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
where
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
Box::into_pin(Box::new(v))
}
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;

Loading…
Cancel
Save