Compare commits

..

8 Commits

Author SHA1 Message Date
2302e9ff64 Implement most of new git implementation's get_file method
This includes a refactoring of get_file to be completely async, as well
as to add a new error case.
2024-01-30 18:47:09 +01:00
142fc14916 Loosen up argument type on unexpected::Error::from 2024-01-25 02:01:17 +01:00
5f1f8ce1b7 Fixes from clippy 2024-01-22 16:58:03 +01:00
b60c849a73 Remove all_descrs method from origin store 2024-01-22 16:58:03 +01:00
98ddefad4f Implemented sync method of new git_proxy module
This module is a WIP, intended to replace the existing git store with
one that doesn't require any locally managed state.
2024-01-22 04:04:19 +01:00
57ee5ff30e Pass origin::Descr::Git url around in parsed form, perform stricter validation on it 2024-01-21 23:24:09 +01:00
ac66ebf6cc Fixed how unexpected Option errors were being worded 2024-01-21 23:19:00 +01:00
78b67a02ad Make origin::Store::sync async
In order for this method to make sense as an async, the returned box
must have its lifetime tied to the store. This did not jive well with
the mock implementation of origin::Store, so I ditched that, along with
the mux tests which depended on it. Oh well.
2024-01-21 16:18:31 +01:00
15 changed files with 963 additions and 439 deletions

527
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -42,11 +42,13 @@ log = "0.4.19"
env_logger = "0.10.0"
serde_yaml = "0.9.22"
rand = "0.8.5"
reqwest = "0.11.18"
hyper-reverse-proxy = "0.5.2-dev"
gemini = "0.0.5"
bytes = "1.4.0"
hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0"
[patch.crates-io]

View File

@ -40,6 +40,9 @@ pub enum SyncWithSettingsError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -60,6 +63,7 @@ impl From<origin::SyncError> for SyncWithSettingsError {
fn from(e: origin::SyncError) -> SyncWithSettingsError {
match e {
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
origin::SyncError::Unavailable => SyncWithSettingsError::Unavailable,
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
@ -96,7 +100,7 @@ pub trait Manager: Sync + Send {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError>;
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
fn sync_with_settings(
&self,
@ -206,13 +210,13 @@ impl ManagerImpl {
manager
}
fn sync_domain_origin(
async fn sync_domain_origin(
&self,
domain: &domain::Name,
origin_descr: &origin::Descr,
) -> Result<(), origin::SyncError> {
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
self.origin_store.sync(origin_descr)
self.origin_store.sync(origin_descr).await
}
async fn sync_origins(&self) -> unexpected::Result<()> {
@ -232,6 +236,7 @@ impl ManagerImpl {
};
self.sync_domain_origin(&domain, &settings.origin_descr)
.await
.map_unexpected_while(|| {
format!(
"syncing origin {:?} for domain {domain}",
@ -396,7 +401,7 @@ impl Manager for ManagerImpl {
&self,
settings: &domain::Settings,
path: &str,
) -> Result<util::BoxByteStream, GetFileError> {
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>> {
let path = settings.process_path(path);
self.origin_store
.get_file(&settings.origin_descr, path.as_ref())
@ -423,7 +428,8 @@ impl Manager for ManagerImpl {
self.domain_checker.check_domain(&domain, &hash).await?;
self.sync_domain_origin(&domain, &settings.origin_descr)?;
self.sync_domain_origin(&domain, &settings.origin_descr)
.await?;
if self.can_sync_gemini_cert() {
self.sync_domain_gemini_cert(&domain)

View File

@ -110,7 +110,7 @@ mod tests {
let settings = domain::Settings {
origin_descr: Descr::Git {
url: "bar".to_string(),
url: "http://bar".parse().unwrap(),
branch_name: "baz".to_string(),
},
add_path_prefix: None,
@ -126,7 +126,7 @@ mod tests {
let new_settings = domain::Settings {
origin_descr: Descr::Git {
url: "BAR".to_string(),
url: "https://BAR".parse().unwrap(),
branch_name: "BAZ".to_string(),
},
add_path_prefix: None,

View File

@ -42,8 +42,11 @@ impl Error {
return Error::from_displays(prefix, &e, e.source());
}
pub fn from(s: &str) -> Error {
Error::from_displays(None::<String>, s, None::<String>)
pub fn from<S>(s: S) -> Error
where
S: AsRef<str>,
{
Error::from_displays(None::<String>, s.as_ref(), None::<String>)
}
}
@ -105,16 +108,19 @@ impl<T, E: error::Error> Mappable<T> for result::Result<T, E> {
}
}
static OPTION_NONE_ERROR: &str = "expected Some but got None";
#[derive(thiserror::Error, Debug)]
enum OptionError {
#[error("required but not given")]
None,
}
impl<T> Mappable<T> for Option<T> {
fn or_unexpected(self) -> Result<T> {
self.ok_or(Error::from(OPTION_NONE_ERROR)).or_unexpected()
self.ok_or(OptionError::None).or_unexpected()
}
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> {
self.ok_or(Error::from(OPTION_NONE_ERROR))
.or_unexpected_while(prefix)
self.ok_or(OptionError::None).or_unexpected_while(prefix)
}
fn map_unexpected_while<F, D>(self, f: F) -> Result<T>
@ -122,8 +128,7 @@ impl<T> Mappable<T> for Option<T> {
F: FnOnce() -> D,
D: fmt::Display,
{
self.ok_or(Error::from(OPTION_NONE_ERROR))
.map_unexpected_while(f)
self.ok_or(OptionError::None).map_unexpected_while(f)
}
}

View File

@ -90,6 +90,8 @@ async fn main() {
let origin_store = domani::origin::git::FSStore::new(&config.origin)
.expect("git origin store initialization failed");
//let origin_store = domani::origin::git_proxy::Proxy::new();
let domain_checker = domani::domain::checker::DNSChecker::new(
domani::token::MemStore::new(),
&config.domain.dns,

View File

@ -1,6 +1,7 @@
mod config;
mod descr;
pub mod descr;
pub mod git;
pub mod git_proxy;
pub mod mux;
pub use config::*;
@ -8,13 +9,15 @@ pub use descr::Descr;
use crate::error::unexpected;
use crate::util;
use std::sync;
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum SyncError {
#[error("invalid url")]
InvalidURL,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("invalid branch name")]
InvalidBranchName,
@ -25,12 +28,6 @@ pub enum SyncError {
Unexpected(#[from] unexpected::Error),
}
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
pub enum AllDescrsError {
#[error(transparent)]
Unexpected(#[from] unexpected::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum GetFileError {
#[error("descr not synced")]
@ -39,6 +36,9 @@ pub enum GetFileError {
#[error("file not found")]
FileNotFound,
#[error("unavailable due to server-side issue")]
Unavailable,
#[error("path is directory")]
PathIsDirectory,
@ -46,32 +46,16 @@ pub enum GetFileError {
Unexpected(#[from] unexpected::Error),
}
#[mockall::automock]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store {
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
/// the origin into the storage.
fn sync(&self, descr: &Descr) -> Result<(), SyncError>;
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>;
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError>;
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
}
pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
sync::Arc::new(sync::Mutex::new(MockStore::new()))
}
impl Store for sync::Arc<sync::Mutex<MockStore>> {
fn sync(&self, descr: &Descr) -> Result<(), SyncError> {
self.lock().unwrap().sync(descr)
}
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError> {
self.lock().unwrap().all_descrs()
}
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError> {
self.lock().unwrap().get_file(descr, path)
}
/// Returns the body of the descr's given path, where path must be absolute.
fn get_file(
&self,
descr: &Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
}

View File

@ -1,19 +1,53 @@
use hex::ToHex;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sha2::{Digest, Sha256};
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct DescrHttpProxyHeader {
pub name: String,
pub value: String,
use crate::error::unexpected::{self, Mappable};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct GitUrl {
pub parsed: http::uri::Uri,
}
impl std::str::FromStr for GitUrl {
type Err = unexpected::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parsed: http::Uri = s
.parse()
.map_unexpected_while(|| format!("parsing as url"))?;
let scheme = parsed
.scheme()
.map_unexpected_while(|| format!("extracting scheme"))?;
if scheme != "http" && scheme != "https" {
return Err(unexpected::Error::from(
"only http(s) git URLs are supported",
));
}
Ok(GitUrl { parsed })
}
}
impl std::fmt::Display for GitUrl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.parsed.fmt(f)
}
}
#[serde_as]
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind")]
/// A unique description of an origin, from where a domain might be served.
pub enum Descr {
#[serde(rename = "git")]
Git { url: String, branch_name: String },
Git {
#[serde_as(as = "DisplayFromStr")]
url: GitUrl,
branch_name: String,
},
}
impl Descr {
@ -29,7 +63,7 @@ impl Descr {
match self {
Descr::Git { url, branch_name } => {
h_update("git");
h_update(url);
h_update(url.parsed.to_string().as_str());
h_update(branch_name);
}
}
@ -45,11 +79,20 @@ mod descr_tests {
fn id() {
assert_eq!(
super::Descr::Git {
url: String::from("foo"),
url: "https://foo".parse().unwrap(),
branch_name: String::from("bar"),
}
.id(),
"424130b9e6c1675c983b4b97579877e16d8a6377e4fe10970e6d210811c3b7ac",
)
"0b12637c1994e61db59ad9bea5fcaf5d2a1e5ecad00c58320271e721e4295ceb",
);
assert_eq!(
super::Descr::Git {
url: "http://foo".parse().unwrap(),
branch_name: String::from("bar"),
}
.id(),
"d14f4c75c938e46c2d9da05ea14b6e41ba86ac9945bb10c0b2f91faee1ec0bce",
);
}
}

View File

@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable};
use crate::{origin, util};
use std::path::{Path, PathBuf};
use std::{collections, fs, io, sync};
use std::{collections, fs, future, io, sync};
use futures::stream;
@ -56,12 +56,12 @@ impl FSStore {
format!("origin/{branch_name}")
}
fn deconstruct_descr(descr: &origin::Descr) -> (&str, &str) {
fn deconstruct_descr(descr: &origin::Descr) -> (String, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
(url.parsed.to_string(), branch_name)
}
fn create_repo_snapshot(
@ -216,137 +216,126 @@ impl FSStore {
}
impl super::Store for FSStore {
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
fn sync(
&self,
descr: &origin::Descr,
) -> util::BoxFuture<'static, Result<(), origin::SyncError>> {
// TODO this implementation is kind of cheating, as it's doing everything synchronously but
// then returning the result in an async box. But the git store is going to be
// re-implemented soon anyway, so it doesn't matter.
let res = (|| {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
let res = self.sync_inner(descr);
let res = self.sync_inner(&descr);
self.sync_guard.lock().unwrap().remove(descr);
self.sync_guard.lock().unwrap().remove(&descr);
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
let repo = match res {
Ok(repo) => repo,
Err(e) => return Err(e),
};
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// repo is synced at this point (though the sync lock is still held), just gotta create
// the RepoSnapshot and store it.
//
// TODO this is a bit of a memory leak, but by the time we get
// to that point this should all be backed by something which isn't local storage
// anyway.
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
// calling this while the sync lock is held isn't ideal, but it's convenient and
// shouldn't be too terrible generally
let repo_snapshot = self
.create_repo_snapshot(repo, &descr)
.map_err(|e| match e {
CreateRepoSnapshotError::InvalidBranchName => {
origin::SyncError::InvalidBranchName
}
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
})?;
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
let mut repo_snapshots = self.repo_snapshots.write().unwrap();
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
Ok(())
}
Ok(())
})();
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
fs::read_dir(&self.dir_path).or_unexpected()?.map(
|dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, origin::AllDescrsError> {
let descr_id: String = dir_entry_res
.or_unexpected()?
.file_name()
.to_str()
.ok_or_else(|| {
unexpected::Error::from("couldn't convert os string to &str")
})?
.into();
let descr_file_path = self.descr_file_path(descr_id.as_ref());
// TODO it's possible that opening the file will fail if syncing is
// still ongoing, as writing the descr file is the last step after
// initial sync has succeeded.
let descr_file = fs::File::open(descr_file_path.as_path())
.map_unexpected_while(|| {
format!("opening descr file {}", descr_file_path.display())
})?;
let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| {
format!("reading descr file {}", descr_file_path.display())
})?;
Ok(descr)
},
).try_collect()
Box::pin(future::ready(res))
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
let repo_snapshot = match self.get_repo_snapshot(descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let repo_snapshot = match self.get_repo_snapshot(&descr) {
Ok(Some(repo_snapshot)) => repo_snapshot,
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
Err(e) => return Err(e.into()),
};
let mut clean_path = Path::new(path);
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let mut clean_path = Path::new(path.as_str());
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
let repo = repo_snapshot.repo.to_thread_local();
let repo = repo_snapshot.repo.to_thread_local();
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
let file_object = repo
.find_object(repo_snapshot.tree_object_id)
.map_unexpected_while(|| {
format!("finding tree object {}", repo_snapshot.tree_object_id)
})?
.peel_to_tree()
.map_unexpected_while(|| {
format!("peeling tree object {}", repo_snapshot.tree_object_id)
})?
.lookup_entry_by_path(clean_path)
.map_unexpected_while(|| {
format!(
"looking up {} in tree object {}",
clean_path.display(),
repo_snapshot.tree_object_id
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(util::into_box_byte_stream(stream::once(
async move { Ok(data) },
)))
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
})?
.ok_or(origin::GetFileError::FileNotFound)?
.object()
.or_unexpected()?;
use gix::object::Kind;
match file_object.kind {
Kind::Tree => Err(origin::GetFileError::PathIsDirectory),
Kind::Blob => {
// TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned.
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(Box::pin(stream::once(async move { Ok(data) })))
.into()),
}
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
format!("found object of kind {} in tree", file_object.kind).as_str(),
)
.into()),
}
})
}
}
/*
#[cfg(test)]
mod tests {
use crate::origin::{self, Config, Store};
@ -374,8 +363,11 @@ mod tests {
let store = super::FSStore::new(&config).expect("store created");
store.sync(&descr).expect("sync should succeed");
store.sync(&descr).expect("second sync should succeed");
store.sync(&descr).await.expect("sync should succeed");
store
.sync(&descr)
.await
.expect("second sync should succeed");
// RepoSnapshot doesn't exist
match store.get_file(&other_descr, "DNE") {
@ -413,3 +405,4 @@ mod tests {
assert_eq!(descr, descrs[0]);
}
}
*/

306
src/origin/git_proxy.rs Normal file
View File

@ -0,0 +1,306 @@
use crate::error::unexpected::{self, Mappable};
use crate::{origin, util};
use std::{collections, sync};
#[derive(Clone)]
struct DescrState {
current_tree: gix_hash::ObjectId,
}
#[derive(Default)]
pub struct Proxy {
client: reqwest::Client,
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
// to prevent syncing the same origin more than once at a time, but still allow hitting that
// origin in during a sync.
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
}
impl Proxy {
pub fn new() -> Proxy {
Proxy::default()
}
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
let origin::Descr::Git {
ref url,
ref branch_name,
} = descr;
(url, branch_name)
}
fn construct_url(
url: &origin::descr::GitUrl,
sub_path: &str,
) -> unexpected::Result<reqwest::Url> {
let mut url: reqwest::Url = {
url.parsed
.to_string()
.parse()
.or_unexpected_while("parsing url as reqwest url")?
};
let base_path = match url.to_file_path() {
Ok(path) => path,
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
};
let new_path = base_path.join(sub_path);
url.set_path(
new_path
.to_str()
.or_unexpected_while("converting new path to string")?,
);
Ok(url)
}
async fn get_tip_commit_hash(
&self,
descr: &origin::Descr,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let (url, branch_name) = Self::deconstruct_descr(descr);
let refs_url =
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?;
// when fetching refs we assume that any issue indicates that the origin itself
// (and therefore the URL) has some kind of issue.
let refs = self
.client
.get(refs_url)
.send()
.await
.or(Err(origin::SyncError::InvalidURL))?
.error_for_status()
.or(Err(origin::SyncError::InvalidURL))?
.text()
.await
.or(Err(origin::SyncError::InvalidURL))?;
let full_ref = format!("refs/heads/{}", branch_name);
for line in refs.lines() {
if !line.ends_with(full_ref.as_str()) {
continue;
}
return gix_hash::ObjectId::from_hex(
line.split_ascii_whitespace()
.next()
.ok_or(origin::SyncError::InvalidURL)?
.as_bytes(),
)
.or(Err(origin::SyncError::InvalidURL));
}
Err(origin::SyncError::InvalidBranchName)
}
async fn get_object(
&self,
descr: &origin::Descr,
oid: &gix_hash::ObjectId,
) -> unexpected::Result<Option<reqwest::Response>> {
let hex = oid.to_string();
let (url, _) = Self::deconstruct_descr(descr);
let object_url = Self::construct_url(
url,
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
)
.or_unexpected_while("constructing refs url")?;
Ok(self
.client
.get(object_url)
.send()
.await
.or_unexpected_while("performing request")?
.error_for_status()
.ok())
}
async fn get_commit_tree(
&self,
descr: &origin::Descr,
commit_hash: &gix_hash::ObjectId,
) -> Result<gix_hash::ObjectId, origin::SyncError> {
let commit_object_bytes = self
.get_object(descr, commit_hash)
.await?
.ok_or(origin::SyncError::Unavailable)?
.bytes()
.await
.or(Err(origin::SyncError::Unavailable))?;
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
.or(Err(origin::SyncError::Unavailable))?
.into_commit()
.ok_or(origin::SyncError::Unavailable)?;
Ok(commit_object.tree())
}
async fn get_tree_entry(
&self,
descr: &origin::Descr,
tree_hash: &gix_hash::ObjectId,
entry_name: &str,
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
let tree_object_bytes = self
.get_object(descr, tree_hash)
.await?
.ok_or(origin::GetFileError::Unavailable)?
.bytes()
.await
.or(Err(origin::GetFileError::Unavailable))?;
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
.or(Err(origin::GetFileError::Unavailable))?
.into_tree()
.ok_or(origin::GetFileError::Unavailable)?;
for entry in tree_object.entries {
if entry.filename == entry_name {
return Ok(entry.into());
}
}
Err(origin::GetFileError::FileNotFound)
}
}
impl origin::Store for Proxy {
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
// isn't actually being held for the whole method duration.
let is_already_syncing = {
self.sync_guard
.lock()
.unwrap()
.insert(descr.clone(), ())
.is_some()
};
if is_already_syncing {
return Err(origin::SyncError::AlreadyInProgress);
}
// perform the rest of the work within this closure, so we can be sure that the guard
// lock is released no matter what.
let res = async {
let commit_hash = self.get_tip_commit_hash(&descr).await?;
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
self.state
.write()
.unwrap()
.insert(descr.clone(), DescrState { current_tree });
Ok(())
}
.await;
self.sync_guard.lock().unwrap().remove(&descr);
res
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
let current_state = self
.state
.read()
.unwrap()
.get(&descr)
.ok_or(origin::GetFileError::DescrNotSynced)?
.clone();
let path = path
.as_str()
.parse::<std::path::PathBuf>()
.or_unexpected_while("parsing path")?
.canonicalize()
.or_unexpected_while("canonicalizing path")?;
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
let path_parts_len = path_parts.len();
if path_parts_len < 2 {
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
return Err(unexpected::Error::from(format!(
"expected first path part to be separator, found {:?}",
path_parts[0]
))
.into());
}
let mut tree_hash = current_state.current_tree;
// The first part is "/" (main separator), and the last is the file name itself.
// Everything in between (if any) should be directories, so navigate those.
for dir_name in path_parts[1..path_parts_len - 1].iter() {
let entry = self
.get_tree_entry(
&descr,
&tree_hash,
dir_name
.to_str()
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
)
.await?;
if !entry.mode.is_tree() {
return Err(origin::GetFileError::FileNotFound);
}
tree_hash = entry.oid;
}
let file_name = {
let file_name = path_parts[path_parts_len - 1];
file_name
.to_str()
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
};
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
// TODO handle symlinks
if entry.mode.is_tree() {
return Err(origin::GetFileError::PathIsDirectory);
} else if !entry.mode.is_blob() {
return Err(unexpected::Error::from(format!(
"can't handle entry {} of mode {}",
entry.filename,
entry.mode.as_str()
))
.into());
}
let res = self
.get_object(&descr, &entry.oid)
.await?
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
.bytes_stream();
use futures::StreamExt;
Ok(util::into_box_byte_stream(res.map(|r| {
use std::io::{Error, ErrorKind};
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
})))
// TODO this is still not correct, as it will include the git object header
})
}
}

View File

@ -3,171 +3,48 @@ use crate::{origin, util};
pub struct Store<F, S>
where
S: origin::Store + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
mapping_fn: F,
stores: Vec<S>,
}
impl<F, S> Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
Store { mapping_fn, stores }
pub fn new(mapping_fn: F) -> Store<F, S> {
Store { mapping_fn }
}
}
impl<F, S> origin::Store for Store<F, S>
where
S: origin::Store + 'static,
S: origin::Store + Sync + Send + 'static,
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
{
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(descr)
}
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
let mut res = Vec::<origin::Descr>::new();
for store in self.stores.iter() {
store.all_descrs()?.into_iter().collect_into(&mut res);
}
Ok(res)
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
let descr = descr.clone();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.sync(&descr)
.await
})
}
fn get_file(
&self,
descr: &origin::Descr,
path: &str,
) -> Result<util::BoxByteStream, origin::GetFileError> {
(self.mapping_fn)(descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(descr, path)
}
}
#[cfg(test)]
mod tests {
use crate::origin;
use std::sync;
struct Harness {
descr_a: origin::Descr,
descr_b: origin::Descr,
descr_unknown: origin::Descr,
store_a: sync::Arc<sync::Mutex<origin::MockStore>>,
store_b: sync::Arc<sync::Mutex<origin::MockStore>>,
store: Box<dyn origin::Store>,
}
impl Harness {
fn new() -> Harness {
let descr_a = origin::Descr::Git {
url: "A".to_string(),
branch_name: "A".to_string(),
};
let descr_b = origin::Descr::Git {
url: "B".to_string(),
branch_name: "B".to_string(),
};
let store_a = origin::new_mock();
let store_b = origin::new_mock();
Harness {
descr_a: descr_a.clone(),
descr_b: descr_b.clone(),
descr_unknown: origin::Descr::Git {
url: "X".to_string(),
branch_name: "X".to_string(),
},
store_a: store_a.clone(),
store_b: store_b.clone(),
store: Box::from(super::Store::new(
{
let store_a = store_a.clone();
let store_b = store_b.clone();
move |descr| match descr {
&origin::Descr::Git { ref url, .. } if url == "A" => {
Some(store_a.clone())
}
&origin::Descr::Git { ref url, .. } if url == "B" => {
Some(store_b.clone())
}
_ => None,
}
},
vec![store_a.clone(), store_b.clone()],
)),
}
}
}
#[test]
fn sync() {
let h = Harness::new();
{
let descr_a = h.descr_a.clone();
h.store_a
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_a)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_a));
{
let descr_b = h.descr_b.clone();
h.store_b
.lock()
.unwrap()
.expect_sync()
.withf(move |descr: &origin::Descr| descr == &descr_b)
.times(1)
.return_const(Ok::<(), origin::SyncError>(()));
}
assert_eq!(Ok(()), h.store.sync(&h.descr_b));
assert!(h.store.sync(&h.descr_unknown).is_err());
}
#[test]
fn all_descrs() {
let h = Harness::new();
h.store_a
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_a
.clone()]));
h.store_b
.lock()
.unwrap()
.expect_all_descrs()
.times(1)
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
.descr_b
.clone()]));
assert_eq!(
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
h.store.all_descrs(),
)
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
let descr = descr.clone();
let path = path.to_string();
Box::pin(async move {
(self.mapping_fn)(&descr)
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
.get_file(&descr, &path)
.await
})
}
}

View File

@ -98,11 +98,16 @@ impl Service {
let path = service::append_index_to_path(req.path(), "index.gmi");
use domain::manager::GetFileError;
let f = match self.domain_manager.get_file(settings, &path) {
let f = match self.domain_manager.get_file(settings, &path).await {
Ok(f) => f,
Err(GetFileError::FileNotFound) => {
return Ok(self.respond_conn(w, "51", "File not found", None).await?)
}
Err(GetFileError::Unavailable) => {
return Ok(self
.respond_conn(w, "43", "Content unavailable", None)
.await?)
}
Err(GetFileError::DescrNotSynced) => {
return Err(unexpected::Error::from(
format!(
@ -117,7 +122,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to
// check index.gmi within the path on the new page load.
let mut path = path.into_owned();
path.push_str("/");
path.push('/');
return Ok(self.respond_conn(w, "30", path.as_str(), None).await?);
}
Err(GetFileError::Unexpected(e)) => return Err(e.into()),

View File

@ -218,9 +218,10 @@ impl Service {
let path = service::append_index_to_path(req.uri().path(), "index.html");
use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path) {
match self.domain_manager.get_file(&settings, &path).await {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error(
format!(
"Backend for {:?} has not yet been synced",
@ -232,7 +233,7 @@ impl Service {
// redirect so that the path has '/' appended to it, which will cause the server to
// check index.html within the path on the new page load.
let mut path = path.into_owned();
path.push_str("/");
path.push('/');
self.render_redirect(
http::status::StatusCode::TEMPORARY_REDIRECT.into(),
path.as_str(),
@ -408,6 +409,11 @@ impl Service {
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::Unavailable) => (Some(
"Fetching the git repository failed; the server is not available or is not corectly serving the repository."
.to_string(),
), false),
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
"The git repository does not have a branch of the given name; please double check
that you input the correct name."

View File

@ -1,7 +1,7 @@
use std::convert::TryFrom;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use serde_with::{serde_as, DisplayFromStr, NoneAsEmptyString};
use crate::{domain, error::unexpected, origin};
@ -10,7 +10,8 @@ use crate::{domain, error::unexpected, origin};
pub struct UrlEncodedDomainSettings {
domain_setting_origin_descr_kind: String,
domain_setting_origin_descr_git_url: Option<String>,
#[serde_as(as = "Option<DisplayFromStr>")]
domain_setting_origin_descr_git_url: Option<origin::descr::GitUrl>,
domain_setting_origin_descr_git_branch_name: Option<String>,
domain_setting_origin_descr_proxy_url: Option<String>,

View File

@ -33,4 +33,11 @@ pub fn parse_file<T: std::str::FromStr>(
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
where
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
{
Box::into_pin(Box::new(v))
}
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;