Compare commits

...

8 Commits

Author SHA1 Message Date
Brian Picciano 2302e9ff64 Implement most of new git implementation's get_file method 3 months ago
Brian Picciano 142fc14916 Loosen up argument type on unexpected::Error::from 3 months ago
Brian Picciano 5f1f8ce1b7 Fixes from clippy 3 months ago
Brian Picciano b60c849a73 Remove all_descrs method from origin store 3 months ago
Brian Picciano 98ddefad4f Implemented sync method of new git_proxy module 3 months ago
Brian Picciano 57ee5ff30e Pass origin::Descr::Git url around in parsed form, perform stricter validation on it 3 months ago
Brian Picciano ac66ebf6cc Fixed how unexpected Option errors were being worded 3 months ago
Brian Picciano 78b67a02ad Make origin::Store::sync async 3 months ago
  1. 525
      Cargo.lock
  2. 4
      Cargo.toml
  3. 16
      src/domain/manager.rs
  4. 4
      src/domain/store.rs
  5. 21
      src/error/unexpected.rs
  6. 2
      src/main.rs
  7. 48
      src/origin.rs
  8. 61
      src/origin/descr.rs
  9. 227
      src/origin/git.rs
  10. 306
      src/origin/git_proxy.rs
  11. 165
      src/origin/mux.rs
  12. 9
      src/service/gemini.rs
  13. 10
      src/service/http.rs
  14. 5
      src/service/http/util.rs
  15. 7
      src/util.rs

525
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -42,11 +42,13 @@ log = "0.4.19"
env_logger = "0.10.0"
serde_yaml = "0.9.22"
rand = "0.8.5"
reqwest = "0.11.18"
hyper-reverse-proxy = "0.5.2-dev"
gemini = "0.0.5"
bytes = "1.4.0"
hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0"
[patch.crates-io]

@ -40,6 +40,9 @@ pub enum SyncWithSettingsError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -60,6 +63,7 @@ impl From<origin::SyncError> for SyncWithSettingsError {
fn from(e: origin::SyncError) -> SyncWithSettingsError {
match e {
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
origin::SyncError::Unavailable => SyncWithSettingsError::Unavailable,
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
@ -96,7 +100,7 @@ pub trait Manager: Sync + Send {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError>;
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
fn sync_with_settings(
&self,
@ -206,13 +210,13 @@ impl ManagerImpl {
manager
}
fn sync_domain_origin(
async fn sync_domain_origin(
&self,
domain: &domain::Name,
origin_descr: &origin::Descr,
) -> Result<(), origin::SyncError> {
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
self.origin_store.sync(origin_descr)
self.origin_store.sync(origin_descr).await
}
async fn sync_origins(&self) -> unexpected::Result<()> {
@ -232,6 +236,7 @@ impl ManagerImpl {
};
self.sync_domain_origin(&domain, &settings.origin_descr)
.await
.map_unexpected_while(|| {
format!(
"syncing origin {:?} for domain {domain}",
@ -396,7 +401,7 @@ impl Manager for ManagerImpl {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError> {
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>> {
let path = settings.process_path(path);
self.origin_store
.get_file(&settings.origin_descr, path.as_ref())
@ -423,7 +428,8 @@ impl Manager for ManagerImpl {
self.domain_checker.check_domain(&domain, &hash).await?;
self.sync_domain_origin(&domain, &settings.origin_descr)?;
self.sync_domain_origin(&domain, &settings.origin_descr)
.await?;
if self.can_sync_gemini_cert() {
self.sync_domain_gemini_cert(&domain)

@ -110,7 +110,7 @@ mod tests {
let settings = domain::Settings {
origin_descr: Descr::Git {
url: "bar".to_string(),
url: "http://bar".parse().unwrap(),
branch_name: "baz".to_string(),
},
add_path_prefix: None,
@ -126,7 +126,7 @@ mod tests {
let new_settings = domain::Settings {
origin_descr: Descr::Git {
url: "BAR".to_string(),
url: "https://BAR".parse().unwrap(),
branch_name: "BAZ".to_string(),
},
add_path_prefix: None,

@ -42,8 +42,11 @@ impl Error {
return Error::from_displays(prefix, &e, e.source());
}
pub fn from(s: &str) -> Error {
Error::from_displays(None::<String>, s, None::<String>)
pub fn from<S>(s: S) -> Error
where
S: AsRef<str>,
{
Error::from_displays(None::<String>, s.as_ref(), None::<String>)
}
}
@ -105,16 +108,19 @@ impl<T, E: error::Error> Mappable<T> for result::Result<T, E> {
}
}
static OPTION_NONE_ERROR: &str = "expected Some but got None";
#[derive(thiserror::Error, Debug)]
enum OptionError {
#[error("required but not given")]
None,
}
impl<T> Mappable<T> for Option<T> {
fn or_unexpected(self) -> Result<T> {
self.ok_or(Error::from(OPTION_NONE_ERROR)).or_unexpected()
self.ok_or(OptionError::None).or_unexpected()
}
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> {
self.ok_or(Error::from(OPTION_NONE_ERROR))
.or_unexpected_while(prefix)
self.ok_or(OptionError::None).or_unexpected_while(prefix)
}
fn map_unexpected_while<F, D>(self, f: F) -> Result<T>
@ -122,8 +128,7 @@ impl<T> Mappable<T> for Option<T> {
F: FnOnce() -> D,
D: fmt::Display,
{
self.ok_or(Error::from(OPTION_NONE_ERROR))
.map_unexpected_while(f)
self.ok_or(OptionError::None).map_unexpected_while(f)
}
}

@ -90,6 +90,8 @@ async fn main() {
let origin_store = domani::origin::git::FSStore::new(&config.origin)
.expect("git origin store initialization failed");
//let origin_store = domani::origin::git_proxy::Proxy::new();
let domain_checker = domani::domain::checker::DNSChecker::new(
domani::token::MemStore::new(),
&config.domain.dns,

@ -1,6 +1,7 @@
mod config;
mod descr;
pub mod descr;
pub mod git;
pub mod git_proxy;
pub mod mux;
pub use config::*;
@ -8,13 +9,15 @@ pub use descr::Descr;
use crate::error::unexpected;
use crate::util;
use std::sync;
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum SyncError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -25,12 +28,6 @@ pub enum SyncError {
Unexpected(#[from] unexpected::Error),
}
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum AllDescrsError {
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum GetFileError {
#[error("descr not synced")]
@ -39,6 +36,9 @@ pub enum GetFileError {
#[error("file not found")]
FileNotFound,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("path is directory")]
PathIsDirectory,
@ -46,32 +46,16 @@ pub enum GetFileError {
Unexpected(#[from] unexpected::Error),
}
#[mockall::automock]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store {
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage.
fn sync(&self, descr: &Descr) -> Result<(), SyncError>;
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError>;
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
}
pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
sync::Arc::new(sync::Mutex::new(MockStore::new()))
}
impl Store for sync::Arc<sync::Mutex<MockStore>> {
fn sync(&self, descr: &Descr) -> Result<(), SyncError> {
self.lock().unwrap().sync(descr)
}
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError> {
self.lock().unwrap().all_descrs()
}
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError> {
self.lock().unwrap().get_file(descr, path)
}
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>;
/// Returns the body of the descr's given path, where path must be absolute.
fn get_file(
&self,
descr: &Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
}

@ -1,19 +1,53 @@
use hex::ToHex;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sha2::{Digest, Sha256};
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DescrHttpProxyHeader {
pub name: String,
pub value: String,
use crate::error::unexpected::{self, Mappable};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct GitUrl {
pub parsed: http::uri::Uri,
}
impl std::str::FromStr for GitUrl {
type Err = unexpected::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parsed: http::Uri = s
.parse()
.map_unexpected_while(|| format!("parsing as url"))?;
let scheme = parsed
.scheme()
.map_unexpected_while(|| format!("extracting scheme"))?;
if scheme != "http" && scheme != "https" {
return Err(unexpected::Error::from(
"only http(s) git URLs are supported",
));
}
Ok(GitUrl { parsed })
}
}
impl std::fmt::Display for GitUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.parsed.fmt(f)
}
}
#[serde_as]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind")]
/// A unique description of an origin, from where a domain might be served.
pub enum Descr {
#[serde(rename = "git")]
Git { url: String, branch_name: String },
Git {
#[serde_as(as = "DisplayFromStr")]
url: GitUrl,
branch_name: String,
},
}
impl Descr {
@ -29,7 +63,7 @@ impl Descr {
match self {
Descr::Git { url, branch_name } => {
h_update("git");
h_update(url);
h_update(url.parsed.to_string().as_str());
h_update(branch_name);
}
}
@ -45,11 +79,20 @@ mod descr_tests {
fn id() {
assert_eq!(
super::Descr::Git {
url: String::from("foo"),
url: "https://foo".parse().unwrap(),
branch_name: String::from("bar"),
}
.id(),
"0b12637c1994e61db59ad9bea5fcaf5d2a1e5ecad00c58320271e721e4295ceb",
);
assert_eq!(
super::Descr::Git {
url: "http://foo".parse().unwrap(),
branch_name: String::from("bar"),
}
.id(),
"424130b9e6c1675c983b4b97579877e16d8a6377e4fe10970e6d210811c3b7ac",
)
"d14f4c75c938e46c2d9da05ea14b6e41ba86ac9945bb10c0b2f91faee1ec0bce",
);
}
}

@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util};
use std::path::{Path, PathBuf};
use std::{collections, fs, io, sync};
use std::{collections, fs, future, io, sync};
use futures::stream;
@ -56,12 +56,12 @@ impl FSStore {
format!("origin/{branch_name}")
}
fn deconstruct_descr(descr: &origin::Descr) -> (&str, &str) {
fn deconstruct_descr(descr: &origin::Descr) -> (String, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
(url.parsed.to_string(), branch_name)
}
fn create_repo_snapshot(
@ -216,137 +216,126 @@ impl FSStore {
}
impl super::Store for FSStore {
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
fn sync(
&self,
descr: &origin::Descr,
) -> util::BoxFuture<'static, Result<(), origin::SyncError>> {
// TODO this implementation is kind of cheating, as it's doing everything synchronously but
// then returning the result in an async box. But the git store is going to be
// re-implemented soon anyway, so it doesn't matter.
let res = (|| {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
let res = self.sync_inner(descr);
let res = self.sync_inner(&descr);
self.sync_guard.lock().unwrap().remove(descr);
self.sync_guard.lock().unwrap().remove(&descr);
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, &descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => {
origin::SyncError::InvalidBranchName
}
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
Ok(())
}
Ok(())
})();
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
fs::read_dir(&self.dir_path).or_unexpected()?.map(
|dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, origin::AllDescrsError> {
let descr_id: String = dir_entry_res
.or_unexpected()?
.file_name()
.to_str()
.ok_or_else(|| {
unexpected::Error::from("couldn't convert os string to &str")
})?
.into();
let descr_file_path = self.descr_file_path(descr_id.as_ref());
// TODO it's possible that opening the file will fail if syncing is
// still ongoing, as writing the descr file is the last step after
// initial sync has succeeded.
let descr_file = fs::File::open(descr_file_path.as_path())
.map_unexpected_while(|| {
format!("opening descr file {}", descr_file_path.display())
})?;
let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| {
format!("reading descr file {}", descr_file_path.display())
})?;
Ok(descr)
},
).try_collect()
Box::pin(future::ready(res))
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
let repo_snapshot = match self.get_repo_snapshot(descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let repo_snapshot = match self.get_repo_snapshot(&descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(path.as_str());
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(util::into_box_byte_stream(stream::once(
async move { Ok(data) },
)))
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(Box::pin(stream::once(async move { Ok(data) })))
.into()),
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
.into()),
}
})
}
}
/*
#[cfg(test)]
mod tests {
use crate::origin::{self, Config, Store};
@ -374,8 +363,11 @@ mod tests {
let store = super::FSStore::new(&config).expect("store created");
store.sync(&descr).expect("sync should succeed");
store.sync(&descr).expect("second sync should succeed");
store.sync(&descr).await.expect("sync should succeed");
store
.sync(&descr)
.await
.expect("second sync should succeed");
// RepoSnapshot doesn't exist
match store.get_file(&other_descr, "DNE") {
@ -413,3 +405,4 @@ mod tests {
assert_eq!(descr, descrs[0]);
}
}
*/

@ -0,0 +1,306 @@
use crate::error::unexpected::{self, Mappable};
use crate::{origin, util};
use std::{collections, sync};
#[derive(Clone)]
struct DescrState {
current_tree: gix_hash::ObjectId,
}
#[derive(Default)]
pub struct Proxy {
client: reqwest::Client,
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
// to prevent syncing the same origin more than once at a time, but still allow hitting that
// origin in during a sync.
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
}
impl Proxy {
pub fn new() -> Proxy {
Proxy::default()
}
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
}
fn construct_url(
url: &origin::descr::GitUrl,
sub_path: &str,
) -> unexpected::Result<reqwest::Url> {
let mut url: reqwest::Url = {
url.parsed
.to_string()
.parse()
.or_unexpected_while("parsing url as reqwest url")?
};
let base_path = match url.to_file_path() {
Ok(path) => path,
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
};
let new_path = base_path.join(sub_path);
url.set_path(
new_path
.to_str()
.or_unexpected_while("converting new path to string")?,
);
Ok(url)
}
async fn get_tip_commit_hash(
&self,
descr: &origin::Descr,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let (url, branch_name) = Self::deconstruct_descr(descr);
let refs_url =
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?;
// when fetching refs we assume that any issue indicates that the origin itself
// (and therefore the URL) has some kind of issue.
let refs = self
.client
.get(refs_url)
.send()
.await
.or(Err(origin::SyncError::InvalidURL))?
.error_for_status()
.or(Err(origin::SyncError::InvalidURL))?
.text()
.await
.or(Err(origin::SyncError::InvalidURL))?;
let full_ref = format!("refs/heads/{}", branch_name);
for line in refs.lines() {
if !line.ends_with(full_ref.as_str()) {
continue;
}
return gix_hash::ObjectId::from_hex(
line.split_ascii_whitespace()
.next()
.ok_or(origin::SyncError::InvalidURL)?
.as_bytes(),
)
.or(Err(origin::SyncError::InvalidURL));
}
Err(origin::SyncError::InvalidBranchName)
}
async fn get_object(
&self,
descr: &origin::Descr,
oid: &gix_hash::ObjectId,
) -> unexpected::Result<Option<reqwest::Response>> {
let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr);
let object_url = Self::construct_url(
url,
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
)
.or_unexpected_while("constructing refs url")?;
Ok(self
.client
.get(object_url)
.send()
.await
.or_unexpected_while("performing request")?
.error_for_status()
.ok())
}
async fn get_commit_tree(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self
.get_object(descr, commit_hash)
.await?
.ok_or(origin::SyncError::Unavailable)?
.bytes()
.await
.or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::Unavailable))?
.into_commit()
.ok_or(origin::SyncError::Unavailable)?;
Ok(commit_object.tree())
}
async fn get_tree_entry(
&self,
descr: &origin::Descr,
tree_hash: &gix_hash::ObjectId,
entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let tree_object_bytes = self
.get_object(descr, tree_hash)
.await?
.ok_or(origin::GetFileError::Unavailable)?
.bytes()
.await
.or(Err(origin::GetFileError::Unavailable))?;
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))?
.into_tree()
.ok_or(origin::GetFileError::Unavailable)?;
for entry in tree_object.entries {
if entry.filename == entry_name {
return Ok(entry.into());
}
}
Err(origin::GetFileError::FileNotFound)
}
}
impl origin::Store for Proxy {
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
// perform the rest of the work within this closure, so we can be sure that the guard
// lock is released no matter what.
let res = async {
let commit_hash = self.get_tip_commit_hash(&descr).await?;
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
self.state
.write()
.unwrap()
.insert(descr.clone(), DescrState { current_tree });
Ok(())
}
.await;
self.sync_guard.lock().unwrap().remove(&descr);
res
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let current_state = self
.state
.read()
.unwrap()
.get(&descr)
.ok_or(origin::GetFileError::DescrNotSynced)?
.clone();
let path = path
.as_str()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")?
.canonicalize()
.or_unexpected_while("canonicalizing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
let path_parts_len = path_parts.len();
if path_parts_len < 2 {
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
return Err(unexpected::Error::from(format!(
"expected first path part to be separator, found {:?}",
path_parts[0]
))
.into());
}
let mut tree_hash = current_state.current_tree;
// The first part is "/" (main separator), and the last is the file name itself.
// Everything in between (if any) should be directories, so navigate those.
for dir_name in path_parts[1..path_parts_len - 1].iter() {
let entry = self
.get_tree_entry(
&descr,
&tree_hash,
dir_name
.to_str()
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
)
.await?;
if !entry.mode.is_tree() {
return Err(origin::GetFileError::FileNotFound);
}
tree_hash = entry.oid;
}
let file_name = {
let file_name = path_parts[path_parts_len - 1];
file_name
.to_str()
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
};
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
// TODO handle symlinks
if entry.mode.is_tree() {
return Err(origin::GetFileError::PathIsDirectory);
} else if !entry.mode.is_blob() {
return Err(unexpected::Error::from(format!(
"can't handle entry {} of mode {}",
entry.filename,
entry.mode.as_str()
))
.into());
}
let res = self
.get_object(&descr, &entry.oid)
.await?
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
.bytes_stream();
use futures::StreamExt;
Ok(util::into_box_byte_stream(res.map(|r| {
use std::io::{Error, ErrorKind};
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
})))
// TODO this is still not correct, as it will include the git object header
})
}
}

@ -3,171 +3,48 @@ use crate::{origin, util};
pub struct Store<F, S>
where
S: origin::Store + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
mapping_fn: F,
stores: Vec<S>,
}
impl<F, S> Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
Store { mapping_fn, stores }
pub fn new(mapping_fn: F) -> Store<F, S> {
Store { mapping_fn }
}
}
impl<F, S> origin::Store for Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(descr)
}
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
let mut res = Vec::<origin::Descr>::new();
for store in self.stores.iter() {
store.all_descrs()?.into_iter().collect_into(&mut res);
}
Ok(res)
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(&descr)
.await
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(descr, path)
}
}
#[cfg(test)]
mod tests {
use crate::origin;
use std::sync;
struct Harness {
descr_a: origin::Descr,
descr_b: origin::Descr,
descr_unknown: origin::Descr,
store_a: sync::Arc<sync::Mutex<origin::MockStore>>,
store_b: sync::Arc<sync::Mutex<origin::MockStore>>,
store: Box<dyn origin::Store>,
}
impl Harness {
fn new() -> Harness {
let descr_a = origin::Descr::Git {
url: "A".to_string(),
branch_name: "A".to_string(),
};
let descr_b = origin::Descr::Git {
url: "B".to_string(),
branch_name: "B".to_string(),
};
let store_a = origin::new_mock();
let store_b = origin::new_mock();
Harness {
descr_a: descr_a.clone(),
descr_b: descr_b.clone(),
descr_unknown: origin::Descr::Git {
url: "X".to_string(),
branch_name: "X".to_string(),
},
store_a: store_a.clone(),
store_b: store_b.clone(),
store: Box::from(super::Store::new(
{
let store_a = store_a.clone();
let store_b = store_b.clone();
move |descr| match descr {
&origin::Descr::Git { ref url, .. } if url == "A" => {
Some(store_a.clone())
}
&origin::Descr::Git { ref url, .. } if url == "B" => {
Some(store_b.clone())
}
_ => None,
}
},
vec![store_a.clone(), store_b.clone()],
)),
}
}
}
#[test]
fn sync() {
let h = Harness::new();
{
let descr_a = h.descr_a.clone();
h.store_a
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_a)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_a));
{
let descr_b = h.descr_b.clone();
h.store_b
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_b)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_b));
assert!(h.store.sync(&h.descr_unknown).is_err());
}
#[test]
fn all_descrs() {
let h = Harness::new();
h.store_a
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_a
.clone()]));
h.store_b
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_b
.clone()]));
assert_eq!(
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
h.store.all_descrs(),
)
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(&descr, &path)
.await
})
}
}

@ -98,11 +98,16 @@ impl Service {
let path = service::append_index_to_path(req.path(), "index.gmi");
use domain::manager::GetFileError;
let f = match self.domain_manager.get_file(settings, &path) {
let f = match self.domain_manager.get_file(settings, &path).await {
Ok(f) => f,
Err(GetFileError::FileNotFound) => {
return Ok(self.respond_conn(w, "51", "File not found", None).await?)
}
Err(GetFileError::Unavailable) => {
return Ok(self
.respond_conn(w, "43", "Content unavailable", None)
.await?)
}
Err(GetFileError::DescrNotSynced) => {
return Err(unexpected::Error::from(
format!(
@ -117,7 +122,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to
// check index.gmi within the path on the new page load.
let mut path = path.into_owned();
path.push_str("/");
path.push('/');
return Ok(self.respond_conn(w, "30", path.as_str(), None).await?);
}
Err(GetFileError::Unexpected(e)) => return Err(e.into()),

@ -218,9 +218,10 @@ impl Service {
let path = service::append_index_to_path(req.uri().path(), "index.html");
use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path) {
match self.domain_manager.get_file(&settings, &path).await {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error(
format!(
"Backend for {:?} has not yet been synced",
@ -232,7 +233,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to
// check index.html within the path on the new page load.
let mut path = path.into_owned();
path.push_str("/");
path.push('/');
self.render_redirect(
http::status::StatusCode::TEMPORARY_REDIRECT.into(),
path.as_str(),
@ -408,6 +409,11 @@ impl Service {
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::Unavailable) => (Some(
"Fetching the git repository failed; the server is not available or is not corectly serving the repository."
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
"The git repository does not have a branch of the given name; please double check
that you input the correct name."

@ -1,7 +1,7 @@
use std::convert::TryFrom;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use serde_with::{serde_as, DisplayFromStr, NoneAsEmptyString};
use crate::{domain, error::unexpected, origin};
@ -10,7 +10,8 @@ use crate::{domain, error::unexpected, origin};
pub struct UrlEncodedDomainSettings {
domain_setting_origin_descr_kind: String,
domain_setting_origin_descr_git_url: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
domain_setting_origin_descr_git_url: Option<origin::descr::GitUrl>,
domain_setting_origin_descr_git_branch_name: Option<String>,
domain_setting_origin_descr_proxy_url: Option<String>,

@ -33,4 +33,11 @@ pub fn parse_file<T: std::str::FromStr>(
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
where
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
Box::into_pin(Box::new(v))
}
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;

Loading…
Cancel
Save