Compare commits
No commits in common. "1e8f9704869fd4dfcbc8b1da45a5ab2e2479baef" and "2302e9ff647410a09c4ad11372eeb2009689b1ae" have entirely different histories.
1e8f970486
...
2302e9ff64
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -523,7 +523,6 @@ name = "domani"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"acme2",
|
"acme2",
|
||||||
"async-compression",
|
|
||||||
"bytes",
|
"bytes",
|
||||||
"clap",
|
"clap",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
|
@ -49,7 +49,6 @@ hyper-trust-dns = "0.5.0"
|
|||||||
gix-hash = "0.14.1"
|
gix-hash = "0.14.1"
|
||||||
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
|
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
|
||||||
gix-object = "0.41.0"
|
gix-object = "0.41.0"
|
||||||
async-compression = { version = "0.4.6", features = ["tokio", "deflate", "zlib"] }
|
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ impl Error {
|
|||||||
|
|
||||||
impl fmt::Display for Error {
|
impl fmt::Display for Error {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(f, "{}", self.0)
|
write!(f, "Unexpected error occurred: {}", self.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -322,9 +322,9 @@ impl super::Store for FSStore {
|
|||||||
// that is cloned.
|
// that is cloned.
|
||||||
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
|
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
|
||||||
|
|
||||||
Ok(util::BoxByteStream::from_stream(stream::once(async move {
|
Ok(util::into_box_byte_stream(stream::once(
|
||||||
Ok(data)
|
async move { Ok(data) },
|
||||||
})))
|
)))
|
||||||
}
|
}
|
||||||
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
|
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
|
||||||
format!("found object of kind {} in tree", file_object.kind).as_str(),
|
format!("found object of kind {} in tree", file_object.kind).as_str(),
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use crate::error::unexpected::{self, Intoable, Mappable};
|
use crate::error::unexpected::{self, Mappable};
|
||||||
use crate::{origin, util};
|
use crate::{origin, util};
|
||||||
use std::{collections, sync};
|
use std::{collections, sync};
|
||||||
|
|
||||||
@ -7,15 +7,6 @@ struct DescrState {
|
|||||||
current_tree: gix_hash::ObjectId,
|
current_tree: gix_hash::ObjectId,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
|
||||||
enum GetObjectError {
|
|
||||||
#[error("unavailable due to server-side issue")]
|
|
||||||
Unavailable,
|
|
||||||
|
|
||||||
#[error(transparent)]
|
|
||||||
Unexpected(#[from] unexpected::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Proxy {
|
pub struct Proxy {
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
@ -50,11 +41,12 @@ impl Proxy {
|
|||||||
.or_unexpected_while("parsing url as reqwest url")?
|
.or_unexpected_while("parsing url as reqwest url")?
|
||||||
};
|
};
|
||||||
|
|
||||||
let new_path = url
|
let base_path = match url.to_file_path() {
|
||||||
.path()
|
Ok(path) => path,
|
||||||
.parse::<std::path::PathBuf>()
|
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
|
||||||
.or_unexpected_while("parsing url path")?
|
};
|
||||||
.join(sub_path);
|
|
||||||
|
let new_path = base_path.join(sub_path);
|
||||||
|
|
||||||
url.set_path(
|
url.set_path(
|
||||||
new_path
|
new_path
|
||||||
@ -72,7 +64,7 @@ impl Proxy {
|
|||||||
let (url, branch_name) = Self::deconstruct_descr(descr);
|
let (url, branch_name) = Self::deconstruct_descr(descr);
|
||||||
|
|
||||||
let refs_url =
|
let refs_url =
|
||||||
Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?;
|
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
// when fetching refs we assume that any issue indicates that the origin itself
|
// when fetching refs we assume that any issue indicates that the origin itself
|
||||||
// (and therefore the URL) has some kind of issue.
|
// (and therefore the URL) has some kind of issue.
|
||||||
@ -110,49 +102,24 @@ impl Proxy {
|
|||||||
&self,
|
&self,
|
||||||
descr: &origin::Descr,
|
descr: &origin::Descr,
|
||||||
oid: &gix_hash::ObjectId,
|
oid: &gix_hash::ObjectId,
|
||||||
expect_kind: gix_object::Kind,
|
) -> unexpected::Result<Option<reqwest::Response>> {
|
||||||
) -> Result<util::BoxByteStream, GetObjectError> {
|
|
||||||
let hex = oid.to_string();
|
let hex = oid.to_string();
|
||||||
let (url, _) = Self::deconstruct_descr(descr);
|
let (url, _) = Self::deconstruct_descr(descr);
|
||||||
|
|
||||||
let object_url =
|
let object_url = Self::construct_url(
|
||||||
Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str())
|
url,
|
||||||
|
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
|
||||||
|
)
|
||||||
.or_unexpected_while("constructing refs url")?;
|
.or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
let mut loose_object = self
|
Ok(self
|
||||||
.client
|
.client
|
||||||
.get(object_url)
|
.get(object_url)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.or(Err(GetObjectError::Unavailable))?
|
.or_unexpected_while("performing request")?
|
||||||
.error_for_status()
|
.error_for_status()
|
||||||
.map(|res| {
|
.ok())
|
||||||
use async_compression::tokio::bufread::ZlibDecoder;
|
|
||||||
use futures::stream::TryStreamExt;
|
|
||||||
use std::io;
|
|
||||||
|
|
||||||
tokio::io::BufReader::new(ZlibDecoder::new(tokio_util::io::StreamReader::new(
|
|
||||||
res.bytes_stream()
|
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
|
|
||||||
)))
|
|
||||||
})
|
|
||||||
.or(Err(GetObjectError::Unavailable))?;
|
|
||||||
|
|
||||||
use tokio::io::AsyncBufReadExt;
|
|
||||||
let mut header = Vec::<u8>::new();
|
|
||||||
loose_object
|
|
||||||
.read_until(0, &mut header)
|
|
||||||
.await
|
|
||||||
.or(Err(GetObjectError::Unavailable))?;
|
|
||||||
|
|
||||||
let (kind, _, _) =
|
|
||||||
gix_object::decode::loose_header(&header).or(Err(GetObjectError::Unavailable))?;
|
|
||||||
|
|
||||||
if kind != expect_kind {
|
|
||||||
return Err(GetObjectError::Unavailable);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(util::BoxByteStream::from_async_read(loose_object))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_commit_tree(
|
async fn get_commit_tree(
|
||||||
@ -161,18 +128,17 @@ impl Proxy {
|
|||||||
commit_hash: &gix_hash::ObjectId,
|
commit_hash: &gix_hash::ObjectId,
|
||||||
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
||||||
let commit_object_bytes = self
|
let commit_object_bytes = self
|
||||||
.get_object(descr, commit_hash, gix_object::Kind::Commit)
|
.get_object(descr, commit_hash)
|
||||||
.await
|
.await?
|
||||||
.map_err(|e| match e {
|
.ok_or(origin::SyncError::Unavailable)?
|
||||||
GetObjectError::Unavailable => origin::SyncError::Unavailable,
|
.bytes()
|
||||||
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
|
|
||||||
})?
|
|
||||||
.read_to_end()
|
|
||||||
.await
|
.await
|
||||||
.or(Err(origin::SyncError::Unavailable))?;
|
.or(Err(origin::SyncError::Unavailable))?;
|
||||||
|
|
||||||
let commit_object = gix_object::CommitRef::from_bytes(commit_object_bytes.as_ref())
|
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
|
||||||
.or(Err(origin::SyncError::Unavailable))?;
|
.or(Err(origin::SyncError::Unavailable))?
|
||||||
|
.into_commit()
|
||||||
|
.ok_or(origin::SyncError::Unavailable)?;
|
||||||
|
|
||||||
Ok(commit_object.tree())
|
Ok(commit_object.tree())
|
||||||
}
|
}
|
||||||
@ -184,18 +150,17 @@ impl Proxy {
|
|||||||
entry_name: &str,
|
entry_name: &str,
|
||||||
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
|
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
|
||||||
let tree_object_bytes = self
|
let tree_object_bytes = self
|
||||||
.get_object(descr, tree_hash, gix_object::Kind::Tree)
|
.get_object(descr, tree_hash)
|
||||||
.await
|
.await?
|
||||||
.map_err(|e| match e {
|
.ok_or(origin::GetFileError::Unavailable)?
|
||||||
GetObjectError::Unavailable => origin::GetFileError::Unavailable,
|
.bytes()
|
||||||
GetObjectError::Unexpected(_) => e.into_unexpected().into(),
|
|
||||||
})?
|
|
||||||
.read_to_end()
|
|
||||||
.await
|
.await
|
||||||
.or(Err(origin::GetFileError::Unavailable))?;
|
.or(Err(origin::GetFileError::Unavailable))?;
|
||||||
|
|
||||||
let tree_object = gix_object::TreeRef::from_bytes(tree_object_bytes.as_ref())
|
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
|
||||||
.or(Err(origin::GetFileError::Unavailable))?;
|
.or(Err(origin::GetFileError::Unavailable))?
|
||||||
|
.into_tree()
|
||||||
|
.ok_or(origin::GetFileError::Unavailable)?;
|
||||||
|
|
||||||
for entry in tree_object.entries {
|
for entry in tree_object.entries {
|
||||||
if entry.filename == entry_name {
|
if entry.filename == entry_name {
|
||||||
@ -262,7 +227,9 @@ impl origin::Store for Proxy {
|
|||||||
let path = path
|
let path = path
|
||||||
.as_str()
|
.as_str()
|
||||||
.parse::<std::path::PathBuf>()
|
.parse::<std::path::PathBuf>()
|
||||||
.or_unexpected_while("parsing path")?;
|
.or_unexpected_while("parsing path")?
|
||||||
|
.canonicalize()
|
||||||
|
.or_unexpected_while("canonicalizing path")?;
|
||||||
|
|
||||||
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
|
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
|
||||||
|
|
||||||
@ -321,14 +288,19 @@ impl origin::Store for Proxy {
|
|||||||
.into());
|
.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.get_object(&descr, &entry.oid, gix_object::Kind::Blob)
|
let res = self
|
||||||
.await
|
.get_object(&descr, &entry.oid)
|
||||||
.map_err(|e| match e {
|
.await?
|
||||||
GetObjectError::Unavailable => origin::GetFileError::Unavailable,
|
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
|
||||||
GetObjectError::Unexpected(_) => e
|
.bytes_stream();
|
||||||
.into_unexpected_while(format!("getting object for entry {:?}", entry))
|
|
||||||
.into(),
|
use futures::StreamExt;
|
||||||
})
|
Ok(util::into_box_byte_stream(res.map(|r| {
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
|
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
|
||||||
|
})))
|
||||||
|
|
||||||
|
// TODO this is still not correct, as it will include the git object header
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ impl Service {
|
|||||||
w.write_all("\r\n".as_bytes()).await.or_unexpected()?;
|
w.write_all("\r\n".as_bytes()).await.or_unexpected()?;
|
||||||
|
|
||||||
if let Some(body) = body {
|
if let Some(body) = body {
|
||||||
let mut body = body.into_async_read();
|
let mut body = tokio_util::io::StreamReader::new(body);
|
||||||
copy(&mut body, &mut w).await.or_unexpected()?;
|
copy(&mut body, &mut w).await.or_unexpected()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +219,7 @@ impl Service {
|
|||||||
|
|
||||||
use domain::manager::GetFileError;
|
use domain::manager::GetFileError;
|
||||||
match self.domain_manager.get_file(&settings, &path).await {
|
match self.domain_manager.get_file(&settings, &path).await {
|
||||||
Ok(f) => self.serve(200, &path, Body::wrap_stream(f.into_stream())),
|
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
|
||||||
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
|
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
|
||||||
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
|
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
|
||||||
Err(GetFileError::DescrNotSynced) => self.internal_error(
|
Err(GetFileError::DescrNotSynced) => self.internal_error(
|
||||||
|
39
src/util.rs
39
src/util.rs
@ -1,4 +1,3 @@
|
|||||||
use futures::stream::BoxStream;
|
|
||||||
use std::{fs, io, path, pin};
|
use std::{fs, io, path, pin};
|
||||||
|
|
||||||
pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
|
pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
|
||||||
@ -32,39 +31,13 @@ pub fn parse_file<T: std::str::FromStr>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BoxByteStream(BoxStream<'static, io::Result<bytes::Bytes>>);
|
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
|
||||||
|
|
||||||
impl BoxByteStream {
|
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
|
||||||
pub fn from_stream<S>(s: S) -> Self
|
where
|
||||||
where
|
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
|
||||||
S: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
|
{
|
||||||
{
|
Box::into_pin(Box::new(v))
|
||||||
Self(Box::into_pin(Box::new(s)))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_async_read<R>(r: R) -> Self
|
|
||||||
where
|
|
||||||
R: tokio::io::AsyncRead + Send + 'static,
|
|
||||||
{
|
|
||||||
Self::from_stream(tokio_util::io::ReaderStream::new(r))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_stream(
|
|
||||||
self,
|
|
||||||
) -> impl futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn into_async_read(self) -> impl tokio::io::AsyncRead + Send + 'static {
|
|
||||||
tokio_util::io::StreamReader::new(self.into_stream())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn read_to_end(self) -> io::Result<Vec<u8>> {
|
|
||||||
use tokio::io::AsyncReadExt;
|
|
||||||
let mut buf = Vec::<u8>::new();
|
|
||||||
self.into_async_read().read_to_end(&mut buf).await?;
|
|
||||||
Ok(buf)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;
|
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;
|
||||||
|
Loading…
Reference in New Issue
Block a user