Compare commits
8 Commits
6dd53f96d8
...
2302e9ff64
Author | SHA1 | Date | |
---|---|---|---|
2302e9ff64 | |||
142fc14916 | |||
5f1f8ce1b7 | |||
b60c849a73 | |||
98ddefad4f | |||
57ee5ff30e | |||
ac66ebf6cc | |||
78b67a02ad |
527
Cargo.lock
generated
527
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -42,11 +42,13 @@ log = "0.4.19"
|
|||||||
env_logger = "0.10.0"
|
env_logger = "0.10.0"
|
||||||
serde_yaml = "0.9.22"
|
serde_yaml = "0.9.22"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
reqwest = "0.11.18"
|
|
||||||
hyper-reverse-proxy = "0.5.2-dev"
|
hyper-reverse-proxy = "0.5.2-dev"
|
||||||
gemini = "0.0.5"
|
gemini = "0.0.5"
|
||||||
bytes = "1.4.0"
|
bytes = "1.4.0"
|
||||||
hyper-trust-dns = "0.5.0"
|
hyper-trust-dns = "0.5.0"
|
||||||
|
gix-hash = "0.14.1"
|
||||||
|
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
|
||||||
|
gix-object = "0.41.0"
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
|
|
||||||
|
@ -40,6 +40,9 @@ pub enum SyncWithSettingsError {
|
|||||||
#[error("invalid url")]
|
#[error("invalid url")]
|
||||||
InvalidURL,
|
InvalidURL,
|
||||||
|
|
||||||
|
#[error("unavailable due to server-side issue")]
|
||||||
|
Unavailable,
|
||||||
|
|
||||||
#[error("invalid branch name")]
|
#[error("invalid branch name")]
|
||||||
InvalidBranchName,
|
InvalidBranchName,
|
||||||
|
|
||||||
@ -60,6 +63,7 @@ impl From<origin::SyncError> for SyncWithSettingsError {
|
|||||||
fn from(e: origin::SyncError) -> SyncWithSettingsError {
|
fn from(e: origin::SyncError) -> SyncWithSettingsError {
|
||||||
match e {
|
match e {
|
||||||
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
|
origin::SyncError::InvalidURL => SyncWithSettingsError::InvalidURL,
|
||||||
|
origin::SyncError::Unavailable => SyncWithSettingsError::Unavailable,
|
||||||
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
|
origin::SyncError::InvalidBranchName => SyncWithSettingsError::InvalidBranchName,
|
||||||
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
|
origin::SyncError::AlreadyInProgress => SyncWithSettingsError::AlreadyInProgress,
|
||||||
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
|
origin::SyncError::Unexpected(e) => SyncWithSettingsError::Unexpected(e),
|
||||||
@ -96,7 +100,7 @@ pub trait Manager: Sync + Send {
|
|||||||
&self,
|
&self,
|
||||||
settings: &domain::Settings,
|
settings: &domain::Settings,
|
||||||
path: &str,
|
path: &str,
|
||||||
) -> Result<util::BoxByteStream, GetFileError>;
|
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
|
||||||
|
|
||||||
fn sync_with_settings(
|
fn sync_with_settings(
|
||||||
&self,
|
&self,
|
||||||
@ -206,13 +210,13 @@ impl ManagerImpl {
|
|||||||
manager
|
manager
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sync_domain_origin(
|
async fn sync_domain_origin(
|
||||||
&self,
|
&self,
|
||||||
domain: &domain::Name,
|
domain: &domain::Name,
|
||||||
origin_descr: &origin::Descr,
|
origin_descr: &origin::Descr,
|
||||||
) -> Result<(), origin::SyncError> {
|
) -> Result<(), origin::SyncError> {
|
||||||
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
|
log::info!("Syncing origin {:?} for domain {domain}", origin_descr,);
|
||||||
self.origin_store.sync(origin_descr)
|
self.origin_store.sync(origin_descr).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn sync_origins(&self) -> unexpected::Result<()> {
|
async fn sync_origins(&self) -> unexpected::Result<()> {
|
||||||
@ -232,6 +236,7 @@ impl ManagerImpl {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.sync_domain_origin(&domain, &settings.origin_descr)
|
self.sync_domain_origin(&domain, &settings.origin_descr)
|
||||||
|
.await
|
||||||
.map_unexpected_while(|| {
|
.map_unexpected_while(|| {
|
||||||
format!(
|
format!(
|
||||||
"syncing origin {:?} for domain {domain}",
|
"syncing origin {:?} for domain {domain}",
|
||||||
@ -396,7 +401,7 @@ impl Manager for ManagerImpl {
|
|||||||
&self,
|
&self,
|
||||||
settings: &domain::Settings,
|
settings: &domain::Settings,
|
||||||
path: &str,
|
path: &str,
|
||||||
) -> Result<util::BoxByteStream, GetFileError> {
|
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>> {
|
||||||
let path = settings.process_path(path);
|
let path = settings.process_path(path);
|
||||||
self.origin_store
|
self.origin_store
|
||||||
.get_file(&settings.origin_descr, path.as_ref())
|
.get_file(&settings.origin_descr, path.as_ref())
|
||||||
@ -423,7 +428,8 @@ impl Manager for ManagerImpl {
|
|||||||
|
|
||||||
self.domain_checker.check_domain(&domain, &hash).await?;
|
self.domain_checker.check_domain(&domain, &hash).await?;
|
||||||
|
|
||||||
self.sync_domain_origin(&domain, &settings.origin_descr)?;
|
self.sync_domain_origin(&domain, &settings.origin_descr)
|
||||||
|
.await?;
|
||||||
|
|
||||||
if self.can_sync_gemini_cert() {
|
if self.can_sync_gemini_cert() {
|
||||||
self.sync_domain_gemini_cert(&domain)
|
self.sync_domain_gemini_cert(&domain)
|
||||||
|
@ -110,7 +110,7 @@ mod tests {
|
|||||||
|
|
||||||
let settings = domain::Settings {
|
let settings = domain::Settings {
|
||||||
origin_descr: Descr::Git {
|
origin_descr: Descr::Git {
|
||||||
url: "bar".to_string(),
|
url: "http://bar".parse().unwrap(),
|
||||||
branch_name: "baz".to_string(),
|
branch_name: "baz".to_string(),
|
||||||
},
|
},
|
||||||
add_path_prefix: None,
|
add_path_prefix: None,
|
||||||
@ -126,7 +126,7 @@ mod tests {
|
|||||||
|
|
||||||
let new_settings = domain::Settings {
|
let new_settings = domain::Settings {
|
||||||
origin_descr: Descr::Git {
|
origin_descr: Descr::Git {
|
||||||
url: "BAR".to_string(),
|
url: "https://BAR".parse().unwrap(),
|
||||||
branch_name: "BAZ".to_string(),
|
branch_name: "BAZ".to_string(),
|
||||||
},
|
},
|
||||||
add_path_prefix: None,
|
add_path_prefix: None,
|
||||||
|
@ -42,8 +42,11 @@ impl Error {
|
|||||||
return Error::from_displays(prefix, &e, e.source());
|
return Error::from_displays(prefix, &e, e.source());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from(s: &str) -> Error {
|
pub fn from<S>(s: S) -> Error
|
||||||
Error::from_displays(None::<String>, s, None::<String>)
|
where
|
||||||
|
S: AsRef<str>,
|
||||||
|
{
|
||||||
|
Error::from_displays(None::<String>, s.as_ref(), None::<String>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,16 +108,19 @@ impl<T, E: error::Error> Mappable<T> for result::Result<T, E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static OPTION_NONE_ERROR: &str = "expected Some but got None";
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
enum OptionError {
|
||||||
|
#[error("required but not given")]
|
||||||
|
None,
|
||||||
|
}
|
||||||
|
|
||||||
impl<T> Mappable<T> for Option<T> {
|
impl<T> Mappable<T> for Option<T> {
|
||||||
fn or_unexpected(self) -> Result<T> {
|
fn or_unexpected(self) -> Result<T> {
|
||||||
self.ok_or(Error::from(OPTION_NONE_ERROR)).or_unexpected()
|
self.ok_or(OptionError::None).or_unexpected()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> {
|
fn or_unexpected_while<D: fmt::Display>(self, prefix: D) -> Result<T> {
|
||||||
self.ok_or(Error::from(OPTION_NONE_ERROR))
|
self.ok_or(OptionError::None).or_unexpected_while(prefix)
|
||||||
.or_unexpected_while(prefix)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_unexpected_while<F, D>(self, f: F) -> Result<T>
|
fn map_unexpected_while<F, D>(self, f: F) -> Result<T>
|
||||||
@ -122,8 +128,7 @@ impl<T> Mappable<T> for Option<T> {
|
|||||||
F: FnOnce() -> D,
|
F: FnOnce() -> D,
|
||||||
D: fmt::Display,
|
D: fmt::Display,
|
||||||
{
|
{
|
||||||
self.ok_or(Error::from(OPTION_NONE_ERROR))
|
self.ok_or(OptionError::None).map_unexpected_while(f)
|
||||||
.map_unexpected_while(f)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,6 +90,8 @@ async fn main() {
|
|||||||
let origin_store = domani::origin::git::FSStore::new(&config.origin)
|
let origin_store = domani::origin::git::FSStore::new(&config.origin)
|
||||||
.expect("git origin store initialization failed");
|
.expect("git origin store initialization failed");
|
||||||
|
|
||||||
|
//let origin_store = domani::origin::git_proxy::Proxy::new();
|
||||||
|
|
||||||
let domain_checker = domani::domain::checker::DNSChecker::new(
|
let domain_checker = domani::domain::checker::DNSChecker::new(
|
||||||
domani::token::MemStore::new(),
|
domani::token::MemStore::new(),
|
||||||
&config.domain.dns,
|
&config.domain.dns,
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
mod config;
|
mod config;
|
||||||
mod descr;
|
pub mod descr;
|
||||||
pub mod git;
|
pub mod git;
|
||||||
|
pub mod git_proxy;
|
||||||
pub mod mux;
|
pub mod mux;
|
||||||
|
|
||||||
pub use config::*;
|
pub use config::*;
|
||||||
@ -8,13 +9,15 @@ pub use descr::Descr;
|
|||||||
|
|
||||||
use crate::error::unexpected;
|
use crate::error::unexpected;
|
||||||
use crate::util;
|
use crate::util;
|
||||||
use std::sync;
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
||||||
pub enum SyncError {
|
pub enum SyncError {
|
||||||
#[error("invalid url")]
|
#[error("invalid url")]
|
||||||
InvalidURL,
|
InvalidURL,
|
||||||
|
|
||||||
|
#[error("unavailable due to server-side issue")]
|
||||||
|
Unavailable,
|
||||||
|
|
||||||
#[error("invalid branch name")]
|
#[error("invalid branch name")]
|
||||||
InvalidBranchName,
|
InvalidBranchName,
|
||||||
|
|
||||||
@ -25,12 +28,6 @@ pub enum SyncError {
|
|||||||
Unexpected(#[from] unexpected::Error),
|
Unexpected(#[from] unexpected::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(thiserror::Error, Clone, Debug, PartialEq)]
|
|
||||||
pub enum AllDescrsError {
|
|
||||||
#[error(transparent)]
|
|
||||||
Unexpected(#[from] unexpected::Error),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
pub enum GetFileError {
|
pub enum GetFileError {
|
||||||
#[error("descr not synced")]
|
#[error("descr not synced")]
|
||||||
@ -39,6 +36,9 @@ pub enum GetFileError {
|
|||||||
#[error("file not found")]
|
#[error("file not found")]
|
||||||
FileNotFound,
|
FileNotFound,
|
||||||
|
|
||||||
|
#[error("unavailable due to server-side issue")]
|
||||||
|
Unavailable,
|
||||||
|
|
||||||
#[error("path is directory")]
|
#[error("path is directory")]
|
||||||
PathIsDirectory,
|
PathIsDirectory,
|
||||||
|
|
||||||
@ -46,32 +46,16 @@ pub enum GetFileError {
|
|||||||
Unexpected(#[from] unexpected::Error),
|
Unexpected(#[from] unexpected::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[mockall::automock]
|
|
||||||
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
|
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
|
||||||
pub trait Store {
|
pub trait Store {
|
||||||
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
|
/// If the origin is of a kind which can be updated, sync will pull down the latest version of
|
||||||
/// the origin into the storage.
|
/// the origin into the storage.
|
||||||
fn sync(&self, descr: &Descr) -> Result<(), SyncError>;
|
fn sync(&self, descr: &Descr) -> util::BoxFuture<'_, Result<(), SyncError>>;
|
||||||
|
|
||||||
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError>;
|
/// Returns the body of the descr's given path, where path must be absolute.
|
||||||
|
fn get_file(
|
||||||
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError>;
|
&self,
|
||||||
}
|
descr: &Descr,
|
||||||
|
path: &str,
|
||||||
pub fn new_mock() -> sync::Arc<sync::Mutex<MockStore>> {
|
) -> util::BoxFuture<'_, Result<util::BoxByteStream, GetFileError>>;
|
||||||
sync::Arc::new(sync::Mutex::new(MockStore::new()))
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Store for sync::Arc<sync::Mutex<MockStore>> {
|
|
||||||
fn sync(&self, descr: &Descr) -> Result<(), SyncError> {
|
|
||||||
self.lock().unwrap().sync(descr)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn all_descrs(&self) -> Result<Vec<Descr>, AllDescrsError> {
|
|
||||||
self.lock().unwrap().all_descrs()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_file(&self, descr: &Descr, path: &str) -> Result<util::BoxByteStream, GetFileError> {
|
|
||||||
self.lock().unwrap().get_file(descr, path)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,19 +1,53 @@
|
|||||||
use hex::ToHex;
|
use hex::ToHex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_with::{serde_as, DisplayFromStr};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
use crate::error::unexpected::{self, Mappable};
|
||||||
pub struct DescrHttpProxyHeader {
|
|
||||||
pub name: String,
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
pub value: String,
|
pub struct GitUrl {
|
||||||
|
pub parsed: http::uri::Uri,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::str::FromStr for GitUrl {
|
||||||
|
type Err = unexpected::Error;
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
let parsed: http::Uri = s
|
||||||
|
.parse()
|
||||||
|
.map_unexpected_while(|| format!("parsing as url"))?;
|
||||||
|
|
||||||
|
let scheme = parsed
|
||||||
|
.scheme()
|
||||||
|
.map_unexpected_while(|| format!("extracting scheme"))?;
|
||||||
|
|
||||||
|
if scheme != "http" && scheme != "https" {
|
||||||
|
return Err(unexpected::Error::from(
|
||||||
|
"only http(s) git URLs are supported",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(GitUrl { parsed })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for GitUrl {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
self.parsed.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[serde_as]
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||||
#[serde(tag = "kind")]
|
#[serde(tag = "kind")]
|
||||||
/// A unique description of an origin, from where a domain might be served.
|
/// A unique description of an origin, from where a domain might be served.
|
||||||
pub enum Descr {
|
pub enum Descr {
|
||||||
#[serde(rename = "git")]
|
#[serde(rename = "git")]
|
||||||
Git { url: String, branch_name: String },
|
Git {
|
||||||
|
#[serde_as(as = "DisplayFromStr")]
|
||||||
|
url: GitUrl,
|
||||||
|
branch_name: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Descr {
|
impl Descr {
|
||||||
@ -29,7 +63,7 @@ impl Descr {
|
|||||||
match self {
|
match self {
|
||||||
Descr::Git { url, branch_name } => {
|
Descr::Git { url, branch_name } => {
|
||||||
h_update("git");
|
h_update("git");
|
||||||
h_update(url);
|
h_update(url.parsed.to_string().as_str());
|
||||||
h_update(branch_name);
|
h_update(branch_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -45,11 +79,20 @@ mod descr_tests {
|
|||||||
fn id() {
|
fn id() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
super::Descr::Git {
|
super::Descr::Git {
|
||||||
url: String::from("foo"),
|
url: "https://foo".parse().unwrap(),
|
||||||
branch_name: String::from("bar"),
|
branch_name: String::from("bar"),
|
||||||
}
|
}
|
||||||
.id(),
|
.id(),
|
||||||
"424130b9e6c1675c983b4b97579877e16d8a6377e4fe10970e6d210811c3b7ac",
|
"0b12637c1994e61db59ad9bea5fcaf5d2a1e5ecad00c58320271e721e4295ceb",
|
||||||
)
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
super::Descr::Git {
|
||||||
|
url: "http://foo".parse().unwrap(),
|
||||||
|
branch_name: String::from("bar"),
|
||||||
|
}
|
||||||
|
.id(),
|
||||||
|
"d14f4c75c938e46c2d9da05ea14b6e41ba86ac9945bb10c0b2f91faee1ec0bce",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ use crate::error::unexpected::{self, Intoable, Mappable};
|
|||||||
use crate::{origin, util};
|
use crate::{origin, util};
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::{collections, fs, io, sync};
|
use std::{collections, fs, future, io, sync};
|
||||||
|
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
|
|
||||||
@ -56,12 +56,12 @@ impl FSStore {
|
|||||||
format!("origin/{branch_name}")
|
format!("origin/{branch_name}")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deconstruct_descr(descr: &origin::Descr) -> (&str, &str) {
|
fn deconstruct_descr(descr: &origin::Descr) -> (String, &str) {
|
||||||
let origin::Descr::Git {
|
let origin::Descr::Git {
|
||||||
ref url,
|
ref url,
|
||||||
ref branch_name,
|
ref branch_name,
|
||||||
} = descr;
|
} = descr;
|
||||||
(url, branch_name)
|
(url.parsed.to_string(), branch_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_repo_snapshot(
|
fn create_repo_snapshot(
|
||||||
@ -216,7 +216,14 @@ impl FSStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl super::Store for FSStore {
|
impl super::Store for FSStore {
|
||||||
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
|
fn sync(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
) -> util::BoxFuture<'static, Result<(), origin::SyncError>> {
|
||||||
|
// TODO this implementation is kind of cheating, as it's doing everything synchronously but
|
||||||
|
// then returning the result in an async box. But the git store is going to be
|
||||||
|
// re-implemented soon anyway, so it doesn't matter.
|
||||||
|
let res = (|| {
|
||||||
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
||||||
// isn't actually being held for the whole method duration.
|
// isn't actually being held for the whole method duration.
|
||||||
let is_already_syncing = {
|
let is_already_syncing = {
|
||||||
@ -231,9 +238,9 @@ impl super::Store for FSStore {
|
|||||||
return Err(origin::SyncError::AlreadyInProgress);
|
return Err(origin::SyncError::AlreadyInProgress);
|
||||||
}
|
}
|
||||||
|
|
||||||
let res = self.sync_inner(descr);
|
let res = self.sync_inner(&descr);
|
||||||
|
|
||||||
self.sync_guard.lock().unwrap().remove(descr);
|
self.sync_guard.lock().unwrap().remove(&descr);
|
||||||
|
|
||||||
let repo = match res {
|
let repo = match res {
|
||||||
Ok(repo) => repo,
|
Ok(repo) => repo,
|
||||||
@ -250,9 +257,11 @@ impl super::Store for FSStore {
|
|||||||
// calling this while the sync lock is held isn't ideal, but it's convenient and
|
// calling this while the sync lock is held isn't ideal, but it's convenient and
|
||||||
// shouldn't be too terrible generally
|
// shouldn't be too terrible generally
|
||||||
let repo_snapshot = self
|
let repo_snapshot = self
|
||||||
.create_repo_snapshot(repo, descr)
|
.create_repo_snapshot(repo, &descr)
|
||||||
.map_err(|e| match e {
|
.map_err(|e| match e {
|
||||||
CreateRepoSnapshotError::InvalidBranchName => origin::SyncError::InvalidBranchName,
|
CreateRepoSnapshotError::InvalidBranchName => {
|
||||||
|
origin::SyncError::InvalidBranchName
|
||||||
|
}
|
||||||
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
|
CreateRepoSnapshotError::Unexpected(e) => origin::SyncError::Unexpected(e),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@ -260,51 +269,26 @@ impl super::Store for FSStore {
|
|||||||
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
|
(*repo_snapshots).insert(descr.clone(), sync::Arc::new(repo_snapshot));
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
})();
|
||||||
|
|
||||||
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
|
Box::pin(future::ready(res))
|
||||||
fs::read_dir(&self.dir_path).or_unexpected()?.map(
|
|
||||||
|dir_entry_res: io::Result<fs::DirEntry>| -> Result<origin::Descr, origin::AllDescrsError> {
|
|
||||||
let descr_id: String = dir_entry_res
|
|
||||||
.or_unexpected()?
|
|
||||||
.file_name()
|
|
||||||
.to_str()
|
|
||||||
.ok_or_else(|| {
|
|
||||||
unexpected::Error::from("couldn't convert os string to &str")
|
|
||||||
})?
|
|
||||||
.into();
|
|
||||||
|
|
||||||
let descr_file_path = self.descr_file_path(descr_id.as_ref());
|
|
||||||
|
|
||||||
// TODO it's possible that opening the file will fail if syncing is
|
|
||||||
// still ongoing, as writing the descr file is the last step after
|
|
||||||
// initial sync has succeeded.
|
|
||||||
let descr_file = fs::File::open(descr_file_path.as_path())
|
|
||||||
.map_unexpected_while(|| {
|
|
||||||
format!("opening descr file {}", descr_file_path.display())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let descr = serde_json::from_reader(descr_file).map_unexpected_while(|| {
|
|
||||||
format!("reading descr file {}", descr_file_path.display())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(descr)
|
|
||||||
},
|
|
||||||
).try_collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_file(
|
fn get_file(
|
||||||
&self,
|
&self,
|
||||||
descr: &origin::Descr,
|
descr: &origin::Descr,
|
||||||
path: &str,
|
path: &str,
|
||||||
) -> Result<util::BoxByteStream, origin::GetFileError> {
|
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
|
||||||
let repo_snapshot = match self.get_repo_snapshot(descr) {
|
let descr = descr.clone();
|
||||||
|
let path = path.to_string();
|
||||||
|
Box::pin(async move {
|
||||||
|
let repo_snapshot = match self.get_repo_snapshot(&descr) {
|
||||||
Ok(Some(repo_snapshot)) => repo_snapshot,
|
Ok(Some(repo_snapshot)) => repo_snapshot,
|
||||||
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
|
Ok(None) => return Err(origin::GetFileError::DescrNotSynced),
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut clean_path = Path::new(path);
|
let mut clean_path = Path::new(path.as_str());
|
||||||
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
|
clean_path = clean_path.strip_prefix("/").unwrap_or(clean_path);
|
||||||
|
|
||||||
let repo = repo_snapshot.repo.to_thread_local();
|
let repo = repo_snapshot.repo.to_thread_local();
|
||||||
@ -337,16 +321,21 @@ impl super::Store for FSStore {
|
|||||||
// TODO this is very not ideal, the whole file is first read totally into memory, and then
|
// TODO this is very not ideal, the whole file is first read totally into memory, and then
|
||||||
// that is cloned.
|
// that is cloned.
|
||||||
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
|
let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
|
||||||
Ok(Box::pin(stream::once(async move { Ok(data) })))
|
|
||||||
|
Ok(util::into_box_byte_stream(stream::once(
|
||||||
|
async move { Ok(data) },
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
|
Kind::Commit | Kind::Tag => Err(unexpected::Error::from(
|
||||||
format!("found object of kind {} in tree", file_object.kind).as_str(),
|
format!("found object of kind {} in tree", file_object.kind).as_str(),
|
||||||
)
|
)
|
||||||
.into()),
|
.into()),
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::origin::{self, Config, Store};
|
use crate::origin::{self, Config, Store};
|
||||||
@ -374,8 +363,11 @@ mod tests {
|
|||||||
|
|
||||||
let store = super::FSStore::new(&config).expect("store created");
|
let store = super::FSStore::new(&config).expect("store created");
|
||||||
|
|
||||||
store.sync(&descr).expect("sync should succeed");
|
store.sync(&descr).await.expect("sync should succeed");
|
||||||
store.sync(&descr).expect("second sync should succeed");
|
store
|
||||||
|
.sync(&descr)
|
||||||
|
.await
|
||||||
|
.expect("second sync should succeed");
|
||||||
|
|
||||||
// RepoSnapshot doesn't exist
|
// RepoSnapshot doesn't exist
|
||||||
match store.get_file(&other_descr, "DNE") {
|
match store.get_file(&other_descr, "DNE") {
|
||||||
@ -413,3 +405,4 @@ mod tests {
|
|||||||
assert_eq!(descr, descrs[0]);
|
assert_eq!(descr, descrs[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
306
src/origin/git_proxy.rs
Normal file
306
src/origin/git_proxy.rs
Normal file
@ -0,0 +1,306 @@
|
|||||||
|
use crate::error::unexpected::{self, Mappable};
|
||||||
|
use crate::{origin, util};
|
||||||
|
use std::{collections, sync};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct DescrState {
|
||||||
|
current_tree: gix_hash::ObjectId,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct Proxy {
|
||||||
|
client: reqwest::Client,
|
||||||
|
state: sync::RwLock<collections::HashMap<origin::Descr, DescrState>>,
|
||||||
|
|
||||||
|
// to prevent syncing the same origin more than once at a time, but still allow hitting that
|
||||||
|
// origin in during a sync.
|
||||||
|
sync_guard: sync::Mutex<collections::HashMap<origin::Descr, ()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Proxy {
|
||||||
|
pub fn new() -> Proxy {
|
||||||
|
Proxy::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deconstruct_descr(descr: &origin::Descr) -> (&origin::descr::GitUrl, &str) {
|
||||||
|
let origin::Descr::Git {
|
||||||
|
ref url,
|
||||||
|
ref branch_name,
|
||||||
|
} = descr;
|
||||||
|
(url, branch_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn construct_url(
|
||||||
|
url: &origin::descr::GitUrl,
|
||||||
|
sub_path: &str,
|
||||||
|
) -> unexpected::Result<reqwest::Url> {
|
||||||
|
let mut url: reqwest::Url = {
|
||||||
|
url.parsed
|
||||||
|
.to_string()
|
||||||
|
.parse()
|
||||||
|
.or_unexpected_while("parsing url as reqwest url")?
|
||||||
|
};
|
||||||
|
|
||||||
|
let base_path = match url.to_file_path() {
|
||||||
|
Ok(path) => path,
|
||||||
|
Err(()) => return Err(unexpected::Error::from("extracting path from url")),
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_path = base_path.join(sub_path);
|
||||||
|
|
||||||
|
url.set_path(
|
||||||
|
new_path
|
||||||
|
.to_str()
|
||||||
|
.or_unexpected_while("converting new path to string")?,
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(url)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_tip_commit_hash(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
||||||
|
let (url, branch_name) = Self::deconstruct_descr(descr);
|
||||||
|
|
||||||
|
let refs_url =
|
||||||
|
Self::construct_url(url, "/info/refs").or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
|
// when fetching refs we assume that any issue indicates that the origin itself
|
||||||
|
// (and therefore the URL) has some kind of issue.
|
||||||
|
let refs = self
|
||||||
|
.client
|
||||||
|
.get(refs_url)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.error_for_status()
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?
|
||||||
|
.text()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::InvalidURL))?;
|
||||||
|
|
||||||
|
let full_ref = format!("refs/heads/{}", branch_name);
|
||||||
|
for line in refs.lines() {
|
||||||
|
if !line.ends_with(full_ref.as_str()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return gix_hash::ObjectId::from_hex(
|
||||||
|
line.split_ascii_whitespace()
|
||||||
|
.next()
|
||||||
|
.ok_or(origin::SyncError::InvalidURL)?
|
||||||
|
.as_bytes(),
|
||||||
|
)
|
||||||
|
.or(Err(origin::SyncError::InvalidURL));
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(origin::SyncError::InvalidBranchName)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_object(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
oid: &gix_hash::ObjectId,
|
||||||
|
) -> unexpected::Result<Option<reqwest::Response>> {
|
||||||
|
let hex = oid.to_string();
|
||||||
|
let (url, _) = Self::deconstruct_descr(descr);
|
||||||
|
|
||||||
|
let object_url = Self::construct_url(
|
||||||
|
url,
|
||||||
|
format!("/objects/{}/{}", &hex[..2], &hex[2..]).as_str(),
|
||||||
|
)
|
||||||
|
.or_unexpected_while("constructing refs url")?;
|
||||||
|
|
||||||
|
Ok(self
|
||||||
|
.client
|
||||||
|
.get(object_url)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.or_unexpected_while("performing request")?
|
||||||
|
.error_for_status()
|
||||||
|
.ok())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_commit_tree(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
commit_hash: &gix_hash::ObjectId,
|
||||||
|
) -> Result<gix_hash::ObjectId, origin::SyncError> {
|
||||||
|
let commit_object_bytes = self
|
||||||
|
.get_object(descr, commit_hash)
|
||||||
|
.await?
|
||||||
|
.ok_or(origin::SyncError::Unavailable)?
|
||||||
|
.bytes()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::SyncError::Unavailable))?;
|
||||||
|
|
||||||
|
let commit_object = gix_object::ObjectRef::from_loose(commit_object_bytes.as_ref())
|
||||||
|
.or(Err(origin::SyncError::Unavailable))?
|
||||||
|
.into_commit()
|
||||||
|
.ok_or(origin::SyncError::Unavailable)?;
|
||||||
|
|
||||||
|
Ok(commit_object.tree())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_tree_entry(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
tree_hash: &gix_hash::ObjectId,
|
||||||
|
entry_name: &str,
|
||||||
|
) -> Result<gix_object::tree::Entry, origin::GetFileError> {
|
||||||
|
let tree_object_bytes = self
|
||||||
|
.get_object(descr, tree_hash)
|
||||||
|
.await?
|
||||||
|
.ok_or(origin::GetFileError::Unavailable)?
|
||||||
|
.bytes()
|
||||||
|
.await
|
||||||
|
.or(Err(origin::GetFileError::Unavailable))?;
|
||||||
|
|
||||||
|
let tree_object = gix_object::ObjectRef::from_loose(tree_object_bytes.as_ref())
|
||||||
|
.or(Err(origin::GetFileError::Unavailable))?
|
||||||
|
.into_tree()
|
||||||
|
.ok_or(origin::GetFileError::Unavailable)?;
|
||||||
|
|
||||||
|
for entry in tree_object.entries {
|
||||||
|
if entry.filename == entry_name {
|
||||||
|
return Ok(entry.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(origin::GetFileError::FileNotFound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl origin::Store for Proxy {
|
||||||
|
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
|
||||||
|
let descr = descr.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
// attempt to lock this descr for syncing, doing so within a new scope so the mutex
|
||||||
|
// isn't actually being held for the whole method duration.
|
||||||
|
let is_already_syncing = {
|
||||||
|
self.sync_guard
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(descr.clone(), ())
|
||||||
|
.is_some()
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_already_syncing {
|
||||||
|
return Err(origin::SyncError::AlreadyInProgress);
|
||||||
|
}
|
||||||
|
|
||||||
|
// perform the rest of the work within this closure, so we can be sure that the guard
|
||||||
|
// lock is released no matter what.
|
||||||
|
let res = async {
|
||||||
|
let commit_hash = self.get_tip_commit_hash(&descr).await?;
|
||||||
|
let current_tree = self.get_commit_tree(&descr, &commit_hash).await?;
|
||||||
|
self.state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.insert(descr.clone(), DescrState { current_tree });
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
|
||||||
|
self.sync_guard.lock().unwrap().remove(&descr);
|
||||||
|
res
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_file(
|
||||||
|
&self,
|
||||||
|
descr: &origin::Descr,
|
||||||
|
path: &str,
|
||||||
|
) -> util::BoxFuture<'_, Result<util::BoxByteStream, origin::GetFileError>> {
|
||||||
|
let descr = descr.clone();
|
||||||
|
let path = path.to_string();
|
||||||
|
Box::pin(async move {
|
||||||
|
let current_state = self
|
||||||
|
.state
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.get(&descr)
|
||||||
|
.ok_or(origin::GetFileError::DescrNotSynced)?
|
||||||
|
.clone();
|
||||||
|
|
||||||
|
let path = path
|
||||||
|
.as_str()
|
||||||
|
.parse::<std::path::PathBuf>()
|
||||||
|
.or_unexpected_while("parsing path")?
|
||||||
|
.canonicalize()
|
||||||
|
.or_unexpected_while("canonicalizing path")?;
|
||||||
|
|
||||||
|
let path_parts = path.iter().collect::<Vec<&std::ffi::OsStr>>();
|
||||||
|
|
||||||
|
let path_parts_len = path_parts.len();
|
||||||
|
|
||||||
|
if path_parts_len < 2 {
|
||||||
|
return Err(unexpected::Error::from("path has fewer than 2 parts").into());
|
||||||
|
} else if path_parts[0] != std::path::MAIN_SEPARATOR_STR {
|
||||||
|
return Err(unexpected::Error::from(format!(
|
||||||
|
"expected first path part to be separator, found {:?}",
|
||||||
|
path_parts[0]
|
||||||
|
))
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut tree_hash = current_state.current_tree;
|
||||||
|
|
||||||
|
// The first part is "/" (main separator), and the last is the file name itself.
|
||||||
|
// Everything in between (if any) should be directories, so navigate those.
|
||||||
|
for dir_name in path_parts[1..path_parts_len - 1].iter() {
|
||||||
|
let entry = self
|
||||||
|
.get_tree_entry(
|
||||||
|
&descr,
|
||||||
|
&tree_hash,
|
||||||
|
dir_name
|
||||||
|
.to_str()
|
||||||
|
.map_unexpected_while(|| format!("decoding dir name {dir_name:?}"))?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !entry.mode.is_tree() {
|
||||||
|
return Err(origin::GetFileError::FileNotFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
tree_hash = entry.oid;
|
||||||
|
}
|
||||||
|
|
||||||
|
let file_name = {
|
||||||
|
let file_name = path_parts[path_parts_len - 1];
|
||||||
|
file_name
|
||||||
|
.to_str()
|
||||||
|
.map_unexpected_while(|| format!("decoding file name {file_name:?}"))?
|
||||||
|
};
|
||||||
|
|
||||||
|
let entry = self.get_tree_entry(&descr, &tree_hash, file_name).await?;
|
||||||
|
|
||||||
|
// TODO handle symlinks
|
||||||
|
if entry.mode.is_tree() {
|
||||||
|
return Err(origin::GetFileError::PathIsDirectory);
|
||||||
|
} else if !entry.mode.is_blob() {
|
||||||
|
return Err(unexpected::Error::from(format!(
|
||||||
|
"can't handle entry {} of mode {}",
|
||||||
|
entry.filename,
|
||||||
|
entry.mode.as_str()
|
||||||
|
))
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = self
|
||||||
|
.get_object(&descr, &entry.oid)
|
||||||
|
.await?
|
||||||
|
.map_unexpected_while(|| format!("object for entry {:?} not found", entry))?
|
||||||
|
.bytes_stream();
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
Ok(util::into_box_byte_stream(res.map(|r| {
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
|
r.map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))
|
||||||
|
})))
|
||||||
|
|
||||||
|
// TODO this is still not correct, as it will include the git object header
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -3,171 +3,48 @@ use crate::{origin, util};
|
|||||||
|
|
||||||
pub struct Store<F, S>
|
pub struct Store<F, S>
|
||||||
where
|
where
|
||||||
S: origin::Store + 'static,
|
|
||||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||||
{
|
{
|
||||||
mapping_fn: F,
|
mapping_fn: F,
|
||||||
stores: Vec<S>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, S> Store<F, S>
|
impl<F, S> Store<F, S>
|
||||||
where
|
where
|
||||||
S: origin::Store + 'static,
|
S: origin::Store + Sync + Send + 'static,
|
||||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||||
{
|
{
|
||||||
pub fn new(mapping_fn: F, stores: Vec<S>) -> Store<F, S> {
|
pub fn new(mapping_fn: F) -> Store<F, S> {
|
||||||
Store { mapping_fn, stores }
|
Store { mapping_fn }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, S> origin::Store for Store<F, S>
|
impl<F, S> origin::Store for Store<F, S>
|
||||||
where
|
where
|
||||||
S: origin::Store + 'static,
|
S: origin::Store + Sync + Send + 'static,
|
||||||
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
F: Fn(&origin::Descr) -> Option<S> + Sync + Send,
|
||||||
{
|
{
|
||||||
fn sync(&self, descr: &origin::Descr) -> Result<(), origin::SyncError> {
|
fn sync(&self, descr: &origin::Descr) -> util::BoxFuture<'_, Result<(), origin::SyncError>> {
|
||||||
(self.mapping_fn)(descr)
|
let descr = descr.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
(self.mapping_fn)(&descr)
|
||||||
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
||||||
.sync(descr)
|
.sync(&descr)
|
||||||
}
|
.await
|
||||||
|
})
|
||||||
fn all_descrs(&self) -> Result<Vec<origin::Descr>, origin::AllDescrsError> {
|
|
||||||
let mut res = Vec::<origin::Descr>::new();
|
|
||||||
|
|
||||||
for store in self.stores.iter() {
|
|
||||||
store.all_descrs()?.into_iter().collect_into(&mut res);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_file(
|
fn get_file(
|
||||||
&self,
|
&self,
|
||||||
descr: &origin::Descr,
|
descr: &origin::Descr,
|
||||||
path: &str,
|
path: &str,
|
||||||
) -> Result<util::BoxByteStream, origin::GetFileError> {
|
) -> util::BoxFuture<Result<util::BoxByteStream, origin::GetFileError>> {
|
||||||
(self.mapping_fn)(descr)
|
let descr = descr.clone();
|
||||||
|
let path = path.to_string();
|
||||||
|
Box::pin(async move {
|
||||||
|
(self.mapping_fn)(&descr)
|
||||||
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
.or_unexpected_while(format!("mapping {:?} to store", &descr))?
|
||||||
.get_file(descr, path)
|
.get_file(&descr, &path)
|
||||||
}
|
.await
|
||||||
}
|
})
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use crate::origin;
|
|
||||||
use std::sync;
|
|
||||||
|
|
||||||
struct Harness {
|
|
||||||
descr_a: origin::Descr,
|
|
||||||
descr_b: origin::Descr,
|
|
||||||
descr_unknown: origin::Descr,
|
|
||||||
store_a: sync::Arc<sync::Mutex<origin::MockStore>>,
|
|
||||||
store_b: sync::Arc<sync::Mutex<origin::MockStore>>,
|
|
||||||
store: Box<dyn origin::Store>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Harness {
|
|
||||||
fn new() -> Harness {
|
|
||||||
let descr_a = origin::Descr::Git {
|
|
||||||
url: "A".to_string(),
|
|
||||||
branch_name: "A".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let descr_b = origin::Descr::Git {
|
|
||||||
url: "B".to_string(),
|
|
||||||
branch_name: "B".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let store_a = origin::new_mock();
|
|
||||||
let store_b = origin::new_mock();
|
|
||||||
|
|
||||||
Harness {
|
|
||||||
descr_a: descr_a.clone(),
|
|
||||||
descr_b: descr_b.clone(),
|
|
||||||
descr_unknown: origin::Descr::Git {
|
|
||||||
url: "X".to_string(),
|
|
||||||
branch_name: "X".to_string(),
|
|
||||||
},
|
|
||||||
store_a: store_a.clone(),
|
|
||||||
store_b: store_b.clone(),
|
|
||||||
store: Box::from(super::Store::new(
|
|
||||||
{
|
|
||||||
let store_a = store_a.clone();
|
|
||||||
let store_b = store_b.clone();
|
|
||||||
move |descr| match descr {
|
|
||||||
&origin::Descr::Git { ref url, .. } if url == "A" => {
|
|
||||||
Some(store_a.clone())
|
|
||||||
}
|
|
||||||
&origin::Descr::Git { ref url, .. } if url == "B" => {
|
|
||||||
Some(store_b.clone())
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
vec![store_a.clone(), store_b.clone()],
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn sync() {
|
|
||||||
let h = Harness::new();
|
|
||||||
|
|
||||||
{
|
|
||||||
let descr_a = h.descr_a.clone();
|
|
||||||
h.store_a
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.expect_sync()
|
|
||||||
.withf(move |descr: &origin::Descr| descr == &descr_a)
|
|
||||||
.times(1)
|
|
||||||
.return_const(Ok::<(), origin::SyncError>(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(Ok(()), h.store.sync(&h.descr_a));
|
|
||||||
|
|
||||||
{
|
|
||||||
let descr_b = h.descr_b.clone();
|
|
||||||
h.store_b
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.expect_sync()
|
|
||||||
.withf(move |descr: &origin::Descr| descr == &descr_b)
|
|
||||||
.times(1)
|
|
||||||
.return_const(Ok::<(), origin::SyncError>(()));
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(Ok(()), h.store.sync(&h.descr_b));
|
|
||||||
|
|
||||||
assert!(h.store.sync(&h.descr_unknown).is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn all_descrs() {
|
|
||||||
let h = Harness::new();
|
|
||||||
|
|
||||||
h.store_a
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.expect_all_descrs()
|
|
||||||
.times(1)
|
|
||||||
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
|
|
||||||
.descr_a
|
|
||||||
.clone()]));
|
|
||||||
|
|
||||||
h.store_b
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.expect_all_descrs()
|
|
||||||
.times(1)
|
|
||||||
.return_const(Ok::<Vec<origin::Descr>, origin::AllDescrsError>(vec![h
|
|
||||||
.descr_b
|
|
||||||
.clone()]));
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
Ok(vec![h.descr_a.clone(), h.descr_b.clone()]),
|
|
||||||
h.store.all_descrs(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -98,11 +98,16 @@ impl Service {
|
|||||||
let path = service::append_index_to_path(req.path(), "index.gmi");
|
let path = service::append_index_to_path(req.path(), "index.gmi");
|
||||||
|
|
||||||
use domain::manager::GetFileError;
|
use domain::manager::GetFileError;
|
||||||
let f = match self.domain_manager.get_file(settings, &path) {
|
let f = match self.domain_manager.get_file(settings, &path).await {
|
||||||
Ok(f) => f,
|
Ok(f) => f,
|
||||||
Err(GetFileError::FileNotFound) => {
|
Err(GetFileError::FileNotFound) => {
|
||||||
return Ok(self.respond_conn(w, "51", "File not found", None).await?)
|
return Ok(self.respond_conn(w, "51", "File not found", None).await?)
|
||||||
}
|
}
|
||||||
|
Err(GetFileError::Unavailable) => {
|
||||||
|
return Ok(self
|
||||||
|
.respond_conn(w, "43", "Content unavailable", None)
|
||||||
|
.await?)
|
||||||
|
}
|
||||||
Err(GetFileError::DescrNotSynced) => {
|
Err(GetFileError::DescrNotSynced) => {
|
||||||
return Err(unexpected::Error::from(
|
return Err(unexpected::Error::from(
|
||||||
format!(
|
format!(
|
||||||
@ -117,7 +122,7 @@ impl Service {
|
|||||||
// redirect so that the path has '/' appended to it, which will cause the server to
|
// redirect so that the path has '/' appended to it, which will cause the server to
|
||||||
// check index.gmi within the path on the new page load.
|
// check index.gmi within the path on the new page load.
|
||||||
let mut path = path.into_owned();
|
let mut path = path.into_owned();
|
||||||
path.push_str("/");
|
path.push('/');
|
||||||
return Ok(self.respond_conn(w, "30", path.as_str(), None).await?);
|
return Ok(self.respond_conn(w, "30", path.as_str(), None).await?);
|
||||||
}
|
}
|
||||||
Err(GetFileError::Unexpected(e)) => return Err(e.into()),
|
Err(GetFileError::Unexpected(e)) => return Err(e.into()),
|
||||||
|
@ -218,9 +218,10 @@ impl Service {
|
|||||||
let path = service::append_index_to_path(req.uri().path(), "index.html");
|
let path = service::append_index_to_path(req.uri().path(), "index.html");
|
||||||
|
|
||||||
use domain::manager::GetFileError;
|
use domain::manager::GetFileError;
|
||||||
match self.domain_manager.get_file(&settings, &path) {
|
match self.domain_manager.get_file(&settings, &path).await {
|
||||||
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
|
Ok(f) => self.serve(200, &path, Body::wrap_stream(f)),
|
||||||
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
|
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
|
||||||
|
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
|
||||||
Err(GetFileError::DescrNotSynced) => self.internal_error(
|
Err(GetFileError::DescrNotSynced) => self.internal_error(
|
||||||
format!(
|
format!(
|
||||||
"Backend for {:?} has not yet been synced",
|
"Backend for {:?} has not yet been synced",
|
||||||
@ -232,7 +233,7 @@ impl Service {
|
|||||||
// redirect so that the path has '/' appended to it, which will cause the server to
|
// redirect so that the path has '/' appended to it, which will cause the server to
|
||||||
// check index.html within the path on the new page load.
|
// check index.html within the path on the new page load.
|
||||||
let mut path = path.into_owned();
|
let mut path = path.into_owned();
|
||||||
path.push_str("/");
|
path.push('/');
|
||||||
self.render_redirect(
|
self.render_redirect(
|
||||||
http::status::StatusCode::TEMPORARY_REDIRECT.into(),
|
http::status::StatusCode::TEMPORARY_REDIRECT.into(),
|
||||||
path.as_str(),
|
path.as_str(),
|
||||||
@ -408,6 +409,11 @@ impl Service {
|
|||||||
.to_string(),
|
.to_string(),
|
||||||
), false),
|
), false),
|
||||||
|
|
||||||
|
Err(domain::manager::SyncWithSettingsError::Unavailable) => (Some(
|
||||||
|
"Fetching the git repository failed; the server is not available or is not corectly serving the repository."
|
||||||
|
.to_string(),
|
||||||
|
), false),
|
||||||
|
|
||||||
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
|
Err(domain::manager::SyncWithSettingsError::InvalidBranchName) => (Some(
|
||||||
"The git repository does not have a branch of the given name; please double check
|
"The git repository does not have a branch of the given name; please double check
|
||||||
that you input the correct name."
|
that you input the correct name."
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_with::{serde_as, NoneAsEmptyString};
|
use serde_with::{serde_as, DisplayFromStr, NoneAsEmptyString};
|
||||||
|
|
||||||
use crate::{domain, error::unexpected, origin};
|
use crate::{domain, error::unexpected, origin};
|
||||||
|
|
||||||
@ -10,7 +10,8 @@ use crate::{domain, error::unexpected, origin};
|
|||||||
pub struct UrlEncodedDomainSettings {
|
pub struct UrlEncodedDomainSettings {
|
||||||
domain_setting_origin_descr_kind: String,
|
domain_setting_origin_descr_kind: String,
|
||||||
|
|
||||||
domain_setting_origin_descr_git_url: Option<String>,
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
|
domain_setting_origin_descr_git_url: Option<origin::descr::GitUrl>,
|
||||||
domain_setting_origin_descr_git_branch_name: Option<String>,
|
domain_setting_origin_descr_git_branch_name: Option<String>,
|
||||||
|
|
||||||
domain_setting_origin_descr_proxy_url: Option<String>,
|
domain_setting_origin_descr_proxy_url: Option<String>,
|
||||||
|
@ -33,4 +33,11 @@ pub fn parse_file<T: std::str::FromStr>(
|
|||||||
|
|
||||||
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
|
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
|
||||||
|
|
||||||
|
pub fn into_box_byte_stream<T>(v: T) -> BoxByteStream
|
||||||
|
where
|
||||||
|
T: futures::stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static,
|
||||||
|
{
|
||||||
|
Box::into_pin(Box::new(v))
|
||||||
|
}
|
||||||
|
|
||||||
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;
|
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;
|
||||||
|
Loading…
Reference in New Issue
Block a user