Compare commits

..

No commits in common. "2302e9ff647410a09c4ad11372eeb2009689b1ae" and "6dd53f96d82d5c472be48bff1a7cce1daddeb092" have entirely different histories.

15 changed files with 438 additions and 962 deletions

525
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -42,13 +42,11 @@ log = "0.4.19"
env_logger = "0.10.0" env_logger = "0.10.0"
serde_yaml = "0.9.22" serde_yaml = "0.9.22"
rand = "0.8.5" rand = "0.8.5"
reqwest = "0.11.18"
hyper-reverse-proxy = "0.5.2-dev" hyper-reverse-proxy = "0.5.2-dev"
gemini = "0.0.5" gemini = "0.0.5"
bytes = "1.4.0" bytes = "1.4.0"
hyper-trust-dns = "0.5.0" hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0"
[patch.crates-io] [patch.crates-io]

View File

@ -40,9 +40,6 @@ pub enum SyncWithSettingsError {
#[error("invalid url")] #[error("invalid url")]
InvalidURL, InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")] #[error("invalid branch name")]
InvalidBranchName, InvalidBranchName,
@ -63,7 +60,6 @@ impl From<origin::SyncError> for SyncWithSettingsError {
fn from(e: origin::SyncError) -> SyncWithSettingsError { fn from(e: origin::SyncError) -> SyncWithSettingsError {
match e { match e {
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL, origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
origin::SyncError::Unavailable => SyncWithSettingsError::Unavailable,
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName, origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress, origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e), origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
@ -100,7 +96,7 @@ pub trait Manager: Sync + Send {
&self, &self,
settings: &domain::Settings, settings: &domain::Settings,
path: &str, path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>; ) -> Result<util::BoxByteStream, GetFileError>;
fn sync_with_settings( fn sync_with_settings(
&self, &self,
@ -210,13 +206,13 @@ impl ManagerImpl {
manager manager
} }
async fn sync_domain_origin( fn sync_domain_origin(
&self, &self,
domain: &domain::Name, domain: &domain::Name,
origin_descr: &origin::Descr, origin_descr: &origin::Descr,
) -> Result<(), origin::SyncError> { ) -> Result<(), origin::SyncError> {
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,); log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
self.origin_store.sync(origin_descr).await self.origin_store.sync(origin_descr)
} }
async fn sync_origins(&self) -> unexpected::Result<()> { async fn sync_origins(&self) -> unexpected::Result<()> {
@ -236,7 +232,6 @@ impl ManagerImpl {
}; };
self.sync_domain_origin(&domain, &settings.origin_descr) self.sync_domain_origin(&domain, &settings.origin_descr)
.await
.map_unexpected_while(|| { .map_unexpected_while(|| {
format!( format!(
"syncing origin {:?} for domain {domain}", "syncing origin {:?} for domain {domain}",
@ -401,7 +396,7 @@ impl Manager for ManagerImpl {
&self, &self,
settings: &domain::Settings, settings: &domain::Settings,
path: &str, path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>> { ) -> Result<util::BoxByteStream, GetFileError> {
let path = settings.process_path(path); let path = settings.process_path(path);
self.origin_store self.origin_store
.get_file(&settings.origin_descr, path.as_ref()) .get_file(&settings.origin_descr, path.as_ref())
@ -428,8 +423,7 @@ impl Manager for ManagerImpl {
self.domain_checker.check_domain(&domain, &hash).await?; self.domain_checker.check_domain(&domain, &hash).await?;
self.sync_domain_origin(&domain, &settings.origin_descr) self.sync_domain_origin(&domain, &settings.origin_descr)?;
.await?;
if self.can_sync_gemini_cert() { if self.can_sync_gemini_cert() {
self.sync_domain_gemini_cert(&domain) self.sync_domain_gemini_cert(&domain)

View File

@ -110,7 +110,7 @@ mod tests {
let settings = domain::Settings { let settings = domain::Settings {
origin_descr: Descr::Git { origin_descr: Descr::Git {
url: "http://bar".parse().unwrap(), url: "bar".to_string(),
branch_name: "baz".to_string(), branch_name: "baz".to_string(),
}, },
add_path_prefix: None, add_path_prefix: None,
@ -126,7 +126,7 @@ mod tests {
let new_settings = domain::Settings { let new_settings = domain::Settings {
origin_descr: Descr::Git { origin_descr: Descr::Git {
url: "https://BAR".parse().unwrap(), url: "BAR".to_string(),
branch_name: "BAZ".to_string(), branch_name: "BAZ".to_string(),
}, },
add_path_prefix: None, add_path_prefix: None,

View File

@ -42,11 +42,8 @@ impl Error {
return Error::from_displays(prefix, &e, e.source()); return Error::from_displays(prefix, &e, e.source());
} }
pub fn from<S>(s: S) -> Error pub fn from(s: &str) -> Error {
where Error::from_displays(None::<String>, s, None::<String>)
S: AsRef<str>,
{
Error::from_displays(None::<String>, s.as_ref(), None::<String>)
} }
} }
@ -108,19 +105,16 @@ impl<T, E: error::Error> Mappable<T> for result::Result<T, E> {
} }
} }
#[derive(thiserror::Error, Debug)] static OPTION_NONE_ERROR: &str = "expected Some but got None";
enum OptionError {
#[error("required but not given")]
None,
}
impl<T> Mappable<T> for Option<T> { impl<T> Mappable<T> for Option<T> {
fn or_unexpected(self) -> Result<T> { fn or_unexpected(self) -> Result<T> {
self.ok_or(OptionError::None).or_unexpected() self.ok_or(Error::from(OPTION_NONE_ERROR)).or_unexpected()
} }
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> { fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> {
self.ok_or(OptionError::None).or_unexpected_while(prefix) self.ok_or(Error::from(OPTION_NONE_ERROR))
.or_unexpected_while(prefix)
} }
fn map_unexpected_while<F, D>(self, f: F) -> Result<T> fn map_unexpected_while<F, D>(self, f: F) -> Result<T>
@ -128,7 +122,8 @@ impl<T> Mappable<T> for Option<T> {
F: FnOnce() -> D, F: FnOnce() -> D,
D: fmt::Display, D: fmt::Display,
{ {
self.ok_or(OptionError::None).map_unexpected_while(f) self.ok_or(Error::from(OPTION_NONE_ERROR))
.map_unexpected_while(f)
} }
} }

View File

@ -90,8 +90,6 @@ async fn main() {
let origin_store = domani::origin::git::FSStore::new(&config.origin) let origin_store = domani::origin::git::FSStore::new(&config.origin)
.expect("git origin store initialization failed"); .expect("git origin store initialization failed");
//let origin_store = domani::origin::git_proxy::Proxy::new();
let domain_checker = domani::domain::checker::DNSChecker::new( let domain_checker = domani::domain::checker::DNSChecker::new(
domani::token::MemStore::new(), domani::token::MemStore::new(),
&config.domain.dns, &config.domain.dns,

View File

@ -1,7 +1,6 @@
mod config; mod config;
pub mod descr; mod descr;
pub mod git; pub mod git;
pub mod git_proxy;
pub mod mux; pub mod mux;
pub use config::*; pub use config::*;
@ -9,15 +8,13 @@ pub use descr::Descr;
use crate::error::unexpected; use crate::error::unexpected;
use crate::util; use crate::util;
use std::sync;
#[derive(thiserror::Error, Clone, Debug, PartialEq)] #[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum SyncError { pub enum SyncError {
#[error("invalid url")] #[error("invalid url")]
InvalidURL, InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")] #[error("invalid branch name")]
InvalidBranchName, InvalidBranchName,
@ -28,6 +25,12 @@ pub enum SyncError {
Unexpected(#[from] unexpected::Error), Unexpected(#[from] unexpected::Error),
} }
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum AllDescrsError {
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
}
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum GetFileError { pub enum GetFileError {
#[error("descr not synced")] #[error("descr not synced")]
@ -36,9 +39,6 @@ pub enum GetFileError {
#[error("file not found")] #[error("file not found")]
FileNotFound, FileNotFound,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("path is directory")] #[error("path is directory")]
PathIsDirectory, PathIsDirectory,
@ -46,16 +46,32 @@ pub enum GetFileError {
Unexpected(#[from] unexpected::Error), Unexpected(#[from] unexpected::Error),
} }
#[mockall::automock]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr. /// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store { pub trait Store {
/// If the origin is of a kind which can be updated, sync will pull down the latest version of /// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage. /// the origin into the storage.
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>; fn sync(&self, descr: &Descr) -> Result<(), SyncError>;
/// Returns the body of the descr's given path, where path must be absolute. fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError>;
fn get_file(
&self, fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
descr: &Descr, }
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>; pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
sync::Arc::new(sync::Mutex::new(MockStore::new()))
}
impl Store for sync::Arc<sync::Mutex<MockStore>> {
fn sync(&self, descr: &Descr) -> Result<(), SyncError> {
self.lock().unwrap().sync(descr)
}
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError> {
self.lock().unwrap().all_descrs()
}
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError> {
self.lock().unwrap().get_file(descr, path)
}
} }

View File

@ -1,53 +1,19 @@
use hex::ToHex; use hex::ToHex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use crate::error::unexpected::{self, Mappable}; #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DescrHttpProxyHeader {
#[derive(Clone, Debug, PartialEq, Eq, Hash)] pub name: String,
pub struct GitUrl { pub value: String,
pub parsed: http::uri::Uri,
} }
impl std::str::FromStr for GitUrl {
type Err = unexpected::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parsed: http::Uri = s
.parse()
.map_unexpected_while(|| format!("parsing as url"))?;
let scheme = parsed
.scheme()
.map_unexpected_while(|| format!("extracting scheme"))?;
if scheme != "http" && scheme != "https" {
return Err(unexpected::Error::from(
"only http(s) git URLs are supported",
));
}
Ok(GitUrl { parsed })
}
}
impl std::fmt::Display for GitUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.parsed.fmt(f)
}
}
#[serde_as]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind")] #[serde(tag = "kind")]
/// A unique description of an origin, from where a domain might be served. /// A unique description of an origin, from where a domain might be served.
pub enum Descr { pub enum Descr {
#[serde(rename = "git")] #[serde(rename = "git")]
Git { Git { url: String, branch_name: String },
#[serde_as(as = "DisplayFromStr")]
url: GitUrl,
branch_name: String,
},
} }
impl Descr { impl Descr {
@ -63,7 +29,7 @@ impl Descr {
match self { match self {
Descr::Git { url, branch_name } => { Descr::Git { url, branch_name } => {
h_update("git"); h_update("git");
h_update(url.parsed.to_string().as_str()); h_update(url);
h_update(branch_name); h_update(branch_name);
} }
} }
@ -79,20 +45,11 @@ mod descr_tests {
fn id() { fn id() {
assert_eq!( assert_eq!(
super::Descr::Git { super::Descr::Git {
url: "https://foo".parse().unwrap(), url: String::from("foo"),
branch_name: String::from("bar"), branch_name: String::from("bar"),
} }
.id(), .id(),
"0b12637c1994e61db59ad9bea5fcaf5d2a1e5ecad00c58320271e721e4295ceb", "424130b9e6c1675c983b4b97579877e16d8a6377e4fe10970e6d210811c3b7ac",
); )
assert_eq!(
super::Descr::Git {
url: "http://foo".parse().unwrap(),
branch_name: String::from("bar"),
}
.id(),
"d14f4c75c938e46c2d9da05ea14b6e41ba86ac9945bb10c0b2f91faee1ec0bce",
);
} }
} }

View File

@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util}; use crate::{origin, util};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::{collections, fs, future, io, sync}; use std::{collections, fs, io, sync};
use futures::stream; use futures::stream;
@ -56,12 +56,12 @@ impl FSStore {
format!("origin/{branch_name}") format!("origin/{branch_name}")
} }
fn deconstruct_descr(descr: &origin::Descr) -> (String, &str) { fn deconstruct_descr(descr: &origin::Descr) -> (&str, &str) {
let origin::Descr::Git { let origin::Descr::Git {
ref url, ref url,
ref branch_name, ref branch_name,
} = descr; } = descr;
(url.parsed.to_string(), branch_name) (url, branch_name)
} }
fn create_repo_snapshot( fn create_repo_snapshot(
@ -216,126 +216,137 @@ impl FSStore {
} }
impl super::Store for FSStore { impl super::Store for FSStore {
fn sync( fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
&self, // attempt to lock this descr for syncing, doing so within a new scope so the mutex
descr: &origin::Descr, // isn't actually being held for the whole method duration.
) -> util::BoxFuture<'static, Result<(), origin::SyncError>> { let is_already_syncing = {
// TODO this implementation is kind of cheating, as it's doing everything synchronously but self.sync_guard
// then returning the result in an async box. But the git store is going to be .lock()
// re-implemented soon anyway, so it doesn't matter. .unwrap()
let res = (|| { .insert(descr.clone(), ())
// attempt to lock this descr for syncing, doing so within a new scope so the mutex .is_some()
// isn't actually being held for the whole method duration. };
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing { if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress); return Err(origin::SyncError::AlreadyInProgress);
} }
let res = self.sync_inner(&descr); let res = self.sync_inner(descr);
self.sync_guard.lock().unwrap().remove(&descr); self.sync_guard.lock().unwrap().remove(descr);
let repo = match res { let repo = match res {
Ok(repo) => repo, Ok(repo) => repo,
Err(e) => return Err(e), Err(e) => return Err(e),
}; };
// repo is synced at this point (though the sync lock is still held), just gotta create // repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it. // the RepoSnapshot and store it.
// //
// TODO this is a bit of a memory leak, but by the time we get // TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage // to that point this should all be backed by something which isn't local storage
// anyway. // anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and // calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally // shouldn't be too terrible generally
let repo_snapshot = self let repo_snapshot = self
.create_repo_snapshot(repo, &descr) .create_repo_snapshot(repo, descr)
.map_err(|e| match e { .map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => { CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
origin::SyncError::InvalidBranchName CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
} })?;
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
let mut repo_snapshots = self.repo_snapshots.write().unwrap(); let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot)); (*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
Ok(()) Ok(())
})(); }
Box::pin(future::ready(res)) fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
fs::read_dir(&self.dir_path).or_unexpected()?.map(
|dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, origin::AllDescrsError> {
let descr_id: String = dir_entry_res
.or_unexpected()?
.file_name()
.to_str()
.ok_or_else(|| {
unexpected::Error::from("couldn't convert os string to &str")
})?
.into();
let descr_file_path = self.descr_file_path(descr_id.as_ref());
// TODO it's possible that opening the file will fail if syncing is
// still ongoing, as writing the descr file is the last step after
// initial sync has succeeded.
let descr_file = fs::File::open(descr_file_path.as_path())
.map_unexpected_while(|| {
format!("opening descr file {}", descr_file_path.display())
})?;
let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| {
format!("reading descr file {}", descr_file_path.display())
})?;
Ok(descr)
},
).try_collect()
} }
fn get_file( fn get_file(
&self, &self,
descr: &origin::Descr, descr: &origin::Descr,
path: &str, path: &str,
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> { ) -> Result<util::BoxByteStream, origin::GetFileError> {
let descr = descr.clone(); let repo_snapshot = match self.get_repo_snapshot(descr) {
let path = path.to_string(); Ok(Some(repo_snapshot)) => repo_snapshot,
Box::pin(async move { Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
let repo_snapshot = match self.get_repo_snapshot(&descr) { Err(e) => return Err(e.into()),
Ok(Some(repo_snapshot)) => repo_snapshot, };
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(path.as_str()); let mut clean_path = Path::new(path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path); clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local(); let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo let file_object = repo
.find_object(repo_snapshot.tree_object_id) .find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| { .map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id) format!("finding tree object {}", repo_snapshot.tree_object_id)
})? })?
.peel_to_tree() .peel_to_tree()
.map_unexpected_while(|| { .map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id) format!("peeling tree object {}", repo_snapshot.tree_object_id)
})? })?
.lookup_entry_by_path(clean_path) .lookup_entry_by_path(clean_path)
.map_unexpected_while(|| { .map_unexpected_while(|| {
format!( format!(
"looking up {} in tree object {}", "looking up {} in tree object {}",
clean_path.display(), clean_path.display(),
repo_snapshot.tree_object_id repo_snapshot.tree_object_id
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(util::into_box_byte_stream(stream::once(
async move { Ok(data) },
)))
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
) )
.into()), })?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(Box::pin(stream::once(async move { Ok(data) })))
} }
}) Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
.into()),
}
} }
} }
/*
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::origin::{self, Config, Store}; use crate::origin::{self, Config, Store};
@ -363,11 +374,8 @@ mod tests {
let store = super::FSStore::new(&config).expect("store created"); let store = super::FSStore::new(&config).expect("store created");
store.sync(&descr).await.expect("sync should succeed"); store.sync(&descr).expect("sync should succeed");
store store.sync(&descr).expect("second sync should succeed");
.sync(&descr)
.await
.expect("second sync should succeed");
// RepoSnapshot doesn't exist // RepoSnapshot doesn't exist
match store.get_file(&other_descr, "DNE") { match store.get_file(&other_descr, "DNE") {
@ -405,4 +413,3 @@ mod tests {
assert_eq!(descr, descrs[0]); assert_eq!(descr, descrs[0]);
} }
} }
*/

View File

@ -1,306 +0,0 @@
use crate::error::unexpected::{self, Mappable};
use crate::{origin, util};
use std::{collections, sync};
#[derive(Clone)]
struct DescrState {
current_tree: gix_hash::ObjectId,
}
#[derive(Default)]
pub struct Proxy {
client: reqwest::Client,
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
// to prevent syncing the same origin more than once at a time, but still allow hitting that
// origin in during a sync.
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
}
impl Proxy {
pub fn new() -> Proxy {
Proxy::default()
}
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
}
fn construct_url(
url: &origin::descr::GitUrl,
sub_path: &str,
) -> unexpected::Result<reqwest::Url> {
let mut url: reqwest::Url = {
url.parsed
.to_string()
.parse()
.or_unexpected_while("parsing url as reqwest url")?
};
let base_path = match url.to_file_path() {
Ok(path) => path,
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
};
let new_path = base_path.join(sub_path);
url.set_path(
new_path
.to_str()
.or_unexpected_while("converting new path to string")?,
);
Ok(url)
}
async fn get_tip_commit_hash(
&self,
descr: &origin::Descr,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let (url, branch_name) = Self::deconstruct_descr(descr);
let refs_url =
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?;
// when fetching refs we assume that any issue indicates that the origin itself
// (and therefore the URL) has some kind of issue.
let refs = self
.client
.get(refs_url)
.send()
.await
.or(Err(origin::SyncError::InvalidURL))?
.error_for_status()
.or(Err(origin::SyncError::InvalidURL))?
.text()
.await
.or(Err(origin::SyncError::InvalidURL))?;
let full_ref = format!("refs/heads/{}", branch_name);
for line in refs.lines() {
if !line.ends_with(full_ref.as_str()) {
continue;
}
return gix_hash::ObjectId::from_hex(
line.split_ascii_whitespace()
.next()
.ok_or(origin::SyncError::InvalidURL)?
.as_bytes(),
)
.or(Err(origin::SyncError::InvalidURL));
}
Err(origin::SyncError::InvalidBranchName)
}
async fn get_object(
&self,
descr: &origin::Descr,
oid: &gix_hash::ObjectId,
) -> unexpected::Result<Option<reqwest::Response>> {
let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr);
let object_url = Self::construct_url(
url,
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
)
.or_unexpected_while("constructing refs url")?;
Ok(self
.client
.get(object_url)
.send()
.await
.or_unexpected_while("performing request")?
.error_for_status()
.ok())
}
async fn get_commit_tree(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self
.get_object(descr, commit_hash)
.await?
.ok_or(origin::SyncError::Unavailable)?
.bytes()
.await
.or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::Unavailable))?
.into_commit()
.ok_or(origin::SyncError::Unavailable)?;
Ok(commit_object.tree())
}
async fn get_tree_entry(
&self,
descr: &origin::Descr,
tree_hash: &gix_hash::ObjectId,
entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let tree_object_bytes = self
.get_object(descr, tree_hash)
.await?
.ok_or(origin::GetFileError::Unavailable)?
.bytes()
.await
.or(Err(origin::GetFileError::Unavailable))?;
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))?
.into_tree()
.ok_or(origin::GetFileError::Unavailable)?;
for entry in tree_object.entries {
if entry.filename == entry_name {
return Ok(entry.into());
}
}
Err(origin::GetFileError::FileNotFound)
}
}
impl origin::Store for Proxy {
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
// perform the rest of the work within this closure, so we can be sure that the guard
// lock is released no matter what.
let res = async {
let commit_hash = self.get_tip_commit_hash(&descr).await?;
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
self.state
.write()
.unwrap()
.insert(descr.clone(), DescrState { current_tree });
Ok(())
}
.await;
self.sync_guard.lock().unwrap().remove(&descr);
res
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let current_state = self
.state
.read()
.unwrap()
.get(&descr)
.ok_or(origin::GetFileError::DescrNotSynced)?
.clone();
let path = path
.as_str()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")?
.canonicalize()
.or_unexpected_while("canonicalizing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
let path_parts_len = path_parts.len();
if path_parts_len < 2 {
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
return Err(unexpected::Error::from(format!(
"expected first path part to be separator, found {:?}",
path_parts[0]
))
.into());
}
let mut tree_hash = current_state.current_tree;
// The first part is "/" (main separator), and the last is the file name itself.
// Everything in between (if any) should be directories, so navigate those.
for dir_name in path_parts[1..path_parts_len - 1].iter() {
let entry = self
.get_tree_entry(
&descr,
&tree_hash,
dir_name
.to_str()
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
)
.await?;
if !entry.mode.is_tree() {
return Err(origin::GetFileError::FileNotFound);
}
tree_hash = entry.oid;
}
let file_name = {
let file_name = path_parts[path_parts_len - 1];
file_name
.to_str()
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
};
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
// TODO handle symlinks
if entry.mode.is_tree() {
return Err(origin::GetFileError::PathIsDirectory);
} else if !entry.mode.is_blob() {
return Err(unexpected::Error::from(format!(
"can't handle entry {} of mode {}",
entry.filename,
entry.mode.as_str()
))
.into());
}
let res = self
.get_object(&descr, &entry.oid)
.await?
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
.bytes_stream();
use futures::StreamExt;
Ok(util::into_box_byte_stream(res.map(|r| {
use std::io::{Error, ErrorKind};
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
})))
// TODO this is still not correct, as it will include the git object header
})
}
}

View File

@ -3,48 +3,171 @@ use crate::{origin, util};
pub struct Store<F, S> pub struct Store<F, S>
where where
S: origin::Store + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send, F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{ {
mapping_fn: F, mapping_fn: F,
stores: Vec<S>,
} }
impl<F, S> Store<F, S> impl<F, S> Store<F, S>
where where
S: origin::Store + Sync + Send + 'static, S: origin::Store + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send, F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{ {
pub fn new(mapping_fn: F) -> Store<F, S> { pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
Store { mapping_fn } Store { mapping_fn, stores }
} }
} }
impl<F, S> origin::Store for Store<F, S> impl<F, S> origin::Store for Store<F, S>
where where
S: origin::Store + Sync + Send + 'static, S: origin::Store + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send, F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{ {
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> { fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
let descr = descr.clone(); (self.mapping_fn)(descr)
Box::pin(async move { .or_unexpected_while(format!("mapping {:?} to store", &descr))?
(self.mapping_fn)(&descr) .sync(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))? }
.sync(&descr)
.await fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
}) let mut res = Vec::<origin::Descr>::new();
for store in self.stores.iter() {
store.all_descrs()?.into_iter().collect_into(&mut res);
}
Ok(res)
} }
fn get_file( fn get_file(
&self, &self,
descr: &origin::Descr, descr: &origin::Descr,
path: &str, path: &str,
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> { ) -> Result<util::BoxByteStream, origin::GetFileError> {
let descr = descr.clone(); (self.mapping_fn)(descr)
let path = path.to_string(); .or_unexpected_while(format!("mapping {:?} to store", &descr))?
Box::pin(async move { .get_file(descr, path)
(self.mapping_fn)(&descr) }
.or_unexpected_while(format!("mapping {:?} to store", &descr))? }
.get_file(&descr, &path)
.await #[cfg(test)]
}) mod tests {
use crate::origin;
use std::sync;
struct Harness {
descr_a: origin::Descr,
descr_b: origin::Descr,
descr_unknown: origin::Descr,
store_a: sync::Arc<sync::Mutex<origin::MockStore>>,
store_b: sync::Arc<sync::Mutex<origin::MockStore>>,
store: Box<dyn origin::Store>,
}
impl Harness {
fn new() -> Harness {
let descr_a = origin::Descr::Git {
url: "A".to_string(),
branch_name: "A".to_string(),
};
let descr_b = origin::Descr::Git {
url: "B".to_string(),
branch_name: "B".to_string(),
};
let store_a = origin::new_mock();
let store_b = origin::new_mock();
Harness {
descr_a: descr_a.clone(),
descr_b: descr_b.clone(),
descr_unknown: origin::Descr::Git {
url: "X".to_string(),
branch_name: "X".to_string(),
},
store_a: store_a.clone(),
store_b: store_b.clone(),
store: Box::from(super::Store::new(
{
let store_a = store_a.clone();
let store_b = store_b.clone();
move |descr| match descr {
&origin::Descr::Git { ref url, .. } if url == "A" => {
Some(store_a.clone())
}
&origin::Descr::Git { ref url, .. } if url == "B" => {
Some(store_b.clone())
}
_ => None,
}
},
vec![store_a.clone(), store_b.clone()],
)),
}
}
}
#[test]
fn sync() {
let h = Harness::new();
{
let descr_a = h.descr_a.clone();
h.store_a
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_a)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_a));
{
let descr_b = h.descr_b.clone();
h.store_b
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_b)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_b));
assert!(h.store.sync(&h.descr_unknown).is_err());
}
#[test]
fn all_descrs() {
let h = Harness::new();
h.store_a
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_a
.clone()]));
h.store_b
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_b
.clone()]));
assert_eq!(
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
h.store.all_descrs(),
)
} }
} }

View File

@ -98,16 +98,11 @@ impl Service {
let path = service::append_index_to_path(req.path(), "index.gmi"); let path = service::append_index_to_path(req.path(), "index.gmi");
use domain::manager::GetFileError; use domain::manager::GetFileError;
let f = match self.domain_manager.get_file(settings, &path).await { let f = match self.domain_manager.get_file(settings, &path) {
Ok(f) => f, Ok(f) => f,
Err(GetFileError::FileNotFound) => { Err(GetFileError::FileNotFound) => {
return Ok(self.respond_conn(w, "51", "File not found", None).await?) return Ok(self.respond_conn(w, "51", "File not found", None).await?)
} }
Err(GetFileError::Unavailable) => {
return Ok(self
.respond_conn(w, "43", "Content unavailable", None)
.await?)
}
Err(GetFileError::DescrNotSynced) => { Err(GetFileError::DescrNotSynced) => {
return Err(unexpected::Error::from( return Err(unexpected::Error::from(
format!( format!(
@ -122,7 +117,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to // redirect so that the path has '/' appended to it, which will cause the server to
// check index.gmi within the path on the new page load. // check index.gmi within the path on the new page load.
let mut path = path.into_owned(); let mut path = path.into_owned();
path.push('/'); path.push_str("/");
return Ok(self.respond_conn(w, "30", path.as_str(), None).await?); return Ok(self.respond_conn(w, "30", path.as_str(), None).await?);
} }
Err(GetFileError::Unexpected(e)) => return Err(e.into()), Err(GetFileError::Unexpected(e)) => return Err(e.into()),

View File

@ -218,10 +218,9 @@ impl Service {
let path = service::append_index_to_path(req.uri().path(), "index.html"); let path = service::append_index_to_path(req.uri().path(), "index.html");
use domain::manager::GetFileError; use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path).await { match self.domain_manager.get_file(&settings, &path) {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)), Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"), Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error( Err(GetFileError::DescrNotSynced) => self.internal_error(
format!( format!(
"Backend for {:?} has not yet been synced", "Backend for {:?} has not yet been synced",
@ -233,7 +232,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to // redirect so that the path has '/' appended to it, which will cause the server to
// check index.html within the path on the new page load. // check index.html within the path on the new page load.
let mut path = path.into_owned(); let mut path = path.into_owned();
path.push('/'); path.push_str("/");
self.render_redirect( self.render_redirect(
http::status::StatusCode::TEMPORARY_REDIRECT.into(), http::status::StatusCode::TEMPORARY_REDIRECT.into(),
path.as_str(), path.as_str(),
@ -409,11 +408,6 @@ impl Service {
.to_string(), .to_string(),
), false), ), false),
Err(domain::manager::SyncWithSettingsError::Unavailable) => (Some(
"Fetching the git repository failed; the server is not available or is not corectly serving the repository."
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some( Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
"The git repository does not have a branch of the given name; please double check "The git repository does not have a branch of the given name; please double check
that you input the correct name." that you input the correct name."

View File

@ -1,7 +1,7 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr, NoneAsEmptyString}; use serde_with::{serde_as, NoneAsEmptyString};
use crate::{domain, error::unexpected, origin}; use crate::{domain, error::unexpected, origin};
@ -10,8 +10,7 @@ use crate::{domain, error::unexpected, origin};
pub struct UrlEncodedDomainSettings { pub struct UrlEncodedDomainSettings {
domain_setting_origin_descr_kind: String, domain_setting_origin_descr_kind: String,
#[serde_as(as = "Option<DisplayFromStr>")] domain_setting_origin_descr_git_url: Option<String>,
domain_setting_origin_descr_git_url: Option<origin::descr::GitUrl>,
domain_setting_origin_descr_git_branch_name: Option<String>, domain_setting_origin_descr_git_branch_name: Option<String>,
domain_setting_origin_descr_proxy_url: Option<String>, domain_setting_origin_descr_proxy_url: Option<String>,

View File

@ -33,11 +33,4 @@ pub fn parse_file<T: std::str::FromStr>(
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>; pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
where
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
Box::into_pin(Box::new(v))
}
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>; pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;