From 1667bac39cd4e5bb77a815ee68f42a8cb70c9b6c Mon Sep 17 00:00:00 2001 From: Brian Picciano Date: Tue, 13 Feb 2024 17:27:00 +0100 Subject: [PATCH] Decompress objects from git server, refactor BoxByteStream in the process --- Cargo.lock | 1 + Cargo.toml | 1 + src/origin/git.rs | 6 ++--- src/origin/git_proxy.rs | 52 +++++++++++++++++++++-------------------- src/service/gemini.rs | 2 +- src/service/http.rs | 2 +- src/util.rs | 39 ++++++++++++++++++++++++++----- 7 files changed, 67 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55da066..9b63a0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -523,6 +523,7 @@ name = "domani" version = "0.1.0" dependencies = [ "acme2", + "async-compression", "bytes", "clap", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index a5f2868..4baddbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ hyper-trust-dns = "0.5.0" gix-hash = "0.14.1" reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] } gix-object = "0.41.0" +async-compression = { version = "0.4.6", features = ["tokio", "deflate", "zlib"] } [patch.crates-io] diff --git a/src/origin/git.rs b/src/origin/git.rs index 22984ea..746c66e 100644 --- a/src/origin/git.rs +++ b/src/origin/git.rs @@ -322,9 +322,9 @@ impl super::Store for FSStore { // that is cloned. let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice()); - Ok(util::into_box_byte_stream(stream::once( - async move { Ok(data) }, - ))) + Ok(util::BoxByteStream::from_stream(stream::once(async move { + Ok(data) + }))) } Kind::Commit | Kind::Tag => Err(unexpected::Error::from( format!("found object of kind {} in tree", file_object.kind).as_str(), diff --git a/src/origin/git_proxy.rs b/src/origin/git_proxy.rs index 3a9fc85..b2c0953 100644 --- a/src/origin/git_proxy.rs +++ b/src/origin/git_proxy.rs @@ -41,12 +41,11 @@ impl Proxy { .or_unexpected_while("parsing url as reqwest url")? }; - let base_path = match url.to_file_path() { - Ok(path) => path, - Err(()) => return Err(unexpected::Error::from("extracting path from url")), - }; - - let new_path = base_path.join(sub_path); + let new_path = url + .path() + .parse::() + .or_unexpected_while("parsing url path")? + .join(sub_path); url.set_path( new_path @@ -64,7 +63,7 @@ impl Proxy { let (url, branch_name) = Self::deconstruct_descr(descr); let refs_url = - Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?; + Self::construct_url(url, "info/refs").or_unexpected_while("constructing refs url")?; // when fetching refs we assume that any issue indicates that the origin itself // (and therefore the URL) has some kind of issue. @@ -102,15 +101,13 @@ impl Proxy { &self, descr: &origin::Descr, oid: &gix_hash::ObjectId, - ) -> unexpected::Result> { + ) -> unexpected::Result> { let hex = oid.to_string(); let (url, _) = Self::deconstruct_descr(descr); - let object_url = Self::construct_url( - url, - format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(), - ) - .or_unexpected_while("constructing refs url")?; + let object_url = + Self::construct_url(url, format!("objects/{}/{}", &hex[..2], &hex[2..]).as_str()) + .or_unexpected_while("constructing refs url")?; Ok(self .client @@ -119,7 +116,19 @@ impl Proxy { .await .or_unexpected_while("performing request")? .error_for_status() - .ok()) + .ok() + .map(|res| { + use async_compression::tokio::bufread::ZlibDecoder; + use futures::stream::TryStreamExt; + use std::io; + + let r = tokio_util::io::StreamReader::new( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ); + + util::BoxByteStream::from_async_read(ZlibDecoder::new(r)) + })) } async fn get_commit_tree( @@ -131,7 +140,7 @@ impl Proxy { .get_object(descr, commit_hash) .await? .ok_or(origin::SyncError::Unavailable)? - .bytes() + .read_to_end() .await .or(Err(origin::SyncError::Unavailable))?; @@ -153,7 +162,7 @@ impl Proxy { .get_object(descr, tree_hash) .await? .ok_or(origin::GetFileError::Unavailable)? - .bytes() + .read_to_end() .await .or(Err(origin::GetFileError::Unavailable))?; @@ -288,17 +297,10 @@ impl origin::Store for Proxy { .into()); } - let res = self + Ok(self .get_object(&descr, &entry.oid) .await? - .map_unexpected_while(|| format!("object for entry {:?} not found", entry))? - .bytes_stream(); - - use futures::StreamExt; - Ok(util::into_box_byte_stream(res.map(|r| { - use std::io::{Error, ErrorKind}; - r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e)) - }))) + .map_unexpected_while(|| format!("object for entry {:?} not found", entry))?) // TODO this is still not correct, as it will include the git object header }) diff --git a/src/service/gemini.rs b/src/service/gemini.rs index c1326fd..10add1c 100644 --- a/src/service/gemini.rs +++ b/src/service/gemini.rs @@ -62,7 +62,7 @@ impl Service { w.write_all("\r\n".as_bytes()).await.or_unexpected()?; if let Some(body) = body { - let mut body = tokio_util::io::StreamReader::new(body); + let mut body = body.into_async_read(); copy(&mut body, &mut w).await.or_unexpected()?; } diff --git a/src/service/http.rs b/src/service/http.rs index f0622c4..88f1701 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -219,7 +219,7 @@ impl Service { use domain::manager::GetFileError; match self.domain_manager.get_file(&settings, &path).await { - Ok(f) => self.serve(200, &path, Body::wrap_stream(f)), + Ok(f) => self.serve(200, &path, Body::wrap_stream(f.into_stream())), Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"), Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"), Err(GetFileError::DescrNotSynced) => self.internal_error( diff --git a/src/util.rs b/src/util.rs index ca2f799..b99d1a6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,3 +1,4 @@ +use futures::stream::BoxStream; use std::{fs, io, path, pin}; pub fn open_file(path: &path::Path) -> io::Result> { @@ -31,13 +32,39 @@ pub fn parse_file( } } -pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result>; +pub struct BoxByteStream(BoxStream<'static, io::Result>); -pub fn into_box_byte_stream(v: T) -> BoxByteStream -where - T: futures::stream::Stream> + Send + 'static, -{ - Box::into_pin(Box::new(v)) +impl BoxByteStream { + pub fn from_stream(s: S) -> Self + where + S: futures::stream::Stream> + Send + 'static, + { + Self(Box::into_pin(Box::new(s))) + } + + pub fn from_async_read(r: R) -> Self + where + R: tokio::io::AsyncRead + Send + 'static, + { + Self::from_stream(tokio_util::io::ReaderStream::new(r)) + } + + pub fn into_stream( + self, + ) -> impl futures::stream::Stream> + Send + 'static { + self.0 + } + + pub fn into_async_read(self) -> impl tokio::io::AsyncRead + Send + 'static { + tokio_util::io::StreamReader::new(self.into_stream()) + } + + pub async fn read_to_end(self) -> io::Result> { + use tokio::io::AsyncReadExt; + let mut buf = Vec::::new(); + self.into_async_read().read_to_end(&mut buf).await?; + Ok(buf) + } } pub type BoxFuture<'a, O> = pin::Pin + Send + 'a>>;