Compare commits

...

3 Commits

  1. 1
      Cargo.lock
  2. 1
      Cargo.toml
  3. 2
      src/error/unexpected.rs
  4. 6
      src/origin/git.rs
  5. 126
      src/origin/git_proxy.rs
  6. 2
      src/service/gemini.rs
  7. 2
      src/service/http.rs
  8. 39
      src/util.rs

1
Cargo.lock generated

@ -523,6 +523,7 @@ name = "domani"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"acme2", "acme2",
"async-compression",
"bytes", "bytes",
"clap", "clap",
"env_logger", "env_logger",

@ -49,6 +49,7 @@ hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1" gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] } reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0" gix-object = "0.41.0"
async-compression = { version = "0.4.6", features = ["tokio", "deflate", "zlib"] }
[patch.crates-io] [patch.crates-io]

@ -52,7 +52,7 @@ impl Error {
impl fmt::Display for Error { impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Unexpected error occurred: {}", self.0) write!(f, "{}", self.0)
} }
} }

@ -322,9 +322,9 @@ impl super::Store for FSStore {
// that is cloned. // that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice()); let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(util::into_box_byte_stream(stream::once( Ok(util::BoxByteStream::from_stream(stream::once(async move {
async move { Ok(data) }, Ok(data)
))) })))
} }
Kind::Commit | Kind::Tag => Err(unexpected::Error::from( Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(), format!("found object of kind {} in tree", file_object.kind).as_str(),

@ -1,4 +1,4 @@
use crate::error::unexpected::{self, Mappable}; use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util}; use crate::{origin, util};
use std::{collections, sync}; use std::{collections, sync};
@ -7,6 +7,15 @@ struct DescrState {
current_tree: gix_hash::ObjectId, current_tree: gix_hash::ObjectId,
} }
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
enum GetObjectError {
#[error("unavailable due to server-side issue")]
Unavailable,
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
}
#[derive(Default)] #[derive(Default)]
pub struct Proxy { pub struct Proxy {
client: reqwest::Client, client: reqwest::Client,
@ -41,12 +50,11 @@ impl Proxy {
.or_unexpected_while("parsing url as reqwest url")? .or_unexpected_while("parsing url as reqwest url")?
}; };
let base_path = match url.to_file_path() { let new_path = url
Ok(path) => path, .path()
Err(()) => return Err(unexpected::Error::from("extracting path from url")), .parse::<std::path::PathBuf>()
}; .or_unexpected_while("parsing url path")?
.join(sub_path);
let new_path = base_path.join(sub_path);
url.set_path( url.set_path(
new_path new_path
@ -64,7 +72,7 @@ impl Proxy {
let (url, branch_name) = Self::deconstruct_descr(descr); let (url, branch_name) = Self::deconstruct_descr(descr);
let refs_url = let refs_url =
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?; Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?;
// when fetching refs we assume that any issue indicates that the origin itself // when fetching refs we assume that any issue indicates that the origin itself
// (and therefore the URL) has some kind of issue. // (and therefore the URL) has some kind of issue.
@ -102,24 +110,49 @@ impl Proxy {
&self, &self,
descr: &origin::Descr, descr: &origin::Descr,
oid: &gix_hash::ObjectId, oid: &gix_hash::ObjectId,
) -> unexpected::Result<Option<reqwest::Response>> { expect_kind: gix_object::Kind,
) -> Result<util::BoxByteStream, GetObjectError> {
let hex = oid.to_string(); let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr); let (url, _) = Self::deconstruct_descr(descr);
let object_url = Self::construct_url( let object_url =
url, Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str())
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(), .or_unexpected_while("constructing refs url")?;
)
.or_unexpected_while("constructing refs url")?;
Ok(self let mut loose_object = self
.client .client
.get(object_url) .get(object_url)
.send() .send()
.await .await
.or_unexpected_while("performing request")? .or(Err(GetObjectError::Unavailable))?
.error_for_status() .error_for_status()
.ok()) .map(|res| {
use async_compression::tokio::bufread::ZlibDecoder;
use futures::stream::TryStreamExt;
use std::io;
tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new(
res.bytes_stream()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
)))
})
.or(Err(GetObjectError::Unavailable))?;
use tokio::io::AsyncBufReadExt;
let mut header = Vec::<u8>::new();
loose_object
.read_until(0, &mut header)
.await
.or(Err(GetObjectError::Unavailable))?;
let (kind, _, _) =
gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?;
if kind != expect_kind {
return Err(GetObjectError::Unavailable);
}
Ok(util::BoxByteStream::from_async_read(loose_object))
} }
async fn get_commit_tree( async fn get_commit_tree(
@ -128,17 +161,18 @@ impl Proxy {
commit_hash: &gix_hash::ObjectId, commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> { ) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self let commit_object_bytes = self
.get_object(descr, commit_hash) .get_object(descr, commit_hash, gix_object::Kind::Commit)
.await? .await
.ok_or(origin::SyncError::Unavailable)? .map_err(|e| match e {
.bytes() GetObjectError::Unavailable => origin::SyncError::Unavailable,
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
})?
.read_to_end()
.await .await
.or(Err(origin::SyncError::Unavailable))?; .or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref()) let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::Unavailable))? .or(Err(origin::SyncError::Unavailable))?;
.into_commit()
.ok_or(origin::SyncError::Unavailable)?;
Ok(commit_object.tree()) Ok(commit_object.tree())
} }
@ -150,17 +184,18 @@ impl Proxy {
entry_name: &str, entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> { ) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let tree_object_bytes = self let tree_object_bytes = self
.get_object(descr, tree_hash) .get_object(descr, tree_hash, gix_object::Kind::Tree)
.await? .await
.ok_or(origin::GetFileError::Unavailable)? .map_err(|e| match e {
.bytes() GetObjectError::Unavailable => origin::GetFileError::Unavailable,
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
})?
.read_to_end()
.await .await
.or(Err(origin::GetFileError::Unavailable))?; .or(Err(origin::GetFileError::Unavailable))?;
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref()) let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))? .or(Err(origin::GetFileError::Unavailable))?;
.into_tree()
.ok_or(origin::GetFileError::Unavailable)?;
for entry in tree_object.entries { for entry in tree_object.entries {
if entry.filename == entry_name { if entry.filename == entry_name {
@ -227,9 +262,7 @@ impl origin::Store for Proxy {
let path = path let path = path
.as_str() .as_str()
.parse::<std::path::PathBuf>() .parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")? .or_unexpected_while("parsing path")?;
.canonicalize()
.or_unexpected_while("canonicalizing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>(); let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
@ -288,19 +321,14 @@ impl origin::Store for Proxy {
.into()); .into());
} }
let res = self self.get_object(&descr, &entry.oid, gix_object::Kind::Blob)
.get_object(&descr, &entry.oid) .await
.await? .map_err(|e| match e {
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))? GetObjectError::Unavailable => origin::GetFileError::Unavailable,
.bytes_stream(); GetObjectError::Unexpected(_) => e
.into_unexpected_while(format!("getting object for entry {:?}", entry))
use futures::StreamExt; .into(),
Ok(util::into_box_byte_stream(res.map(|r| { })
use std::io::{Error, ErrorKind};
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
})))
// TODO this is still not correct, as it will include the git object header
}) })
} }
} }

@ -62,7 +62,7 @@ impl Service {
w.write_all("\r\n".as_bytes()).await.or_unexpected()?; w.write_all("\r\n".as_bytes()).await.or_unexpected()?;
if let Some(body) = body { if let Some(body) = body {
let mut body = tokio_util::io::StreamReader::new(body); let mut body = body.into_async_read();
copy(&mut body, &mut w).await.or_unexpected()?; copy(&mut body, &mut w).await.or_unexpected()?;
} }

@ -219,7 +219,7 @@ impl Service {
use domain::manager::GetFileError; use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path).await { match self.domain_manager.get_file(&settings, &path).await {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)), Ok(f) => self.serve(200, &path, Body::wrap_stream(f.into_stream())),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"), Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"), Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error( Err(GetFileError::DescrNotSynced) => self.internal_error(

@ -1,3 +1,4 @@
use futures::stream::BoxStream;
use std::{fs, io, path, pin}; use std::{fs, io, path, pin};
pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> { pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
@ -31,13 +32,39 @@ pub fn parse_file<T: std::str::FromStr>(
} }
} }
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>; pub struct BoxByteStream(BoxStream<'static, io::Result<bytes::Bytes>>);
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream impl BoxByteStream {
where pub fn from_stream<S>(s: S) -> Self
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static, where
{ S: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
Box::into_pin(Box::new(v)) {
Self(Box::into_pin(Box::new(s)))
}
pub fn from_async_read<R>(r: R) -> Self
where
R: tokio::io::AsyncRead + Send + 'static,
{
Self::from_stream(tokio_util::io::ReaderStream::new(r))
}
pub fn into_stream(
self,
) -> impl futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static {
self.0
}
pub fn into_async_read(self) -> impl tokio::io::AsyncRead + Send + 'static {
tokio_util::io::StreamReader::new(self.into_stream())
}
pub async fn read_to_end(self) -> io::Result<Vec<u8>> {
use tokio::io::AsyncReadExt;
let mut buf = Vec::<u8>::new();
self.into_async_read().read_to_end(&mut buf).await?;
Ok(buf)
}
} }
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>; pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;

Loading…
Cancel
Save