Compare commits

...

2 Commits

Author SHA1 Message Date
Brian Picciano
16c1d0c4aa WIP 2023-07-27 18:00:23 +02:00
Brian Picciano
5a15f07872 Include builtins in origin syncing 2023-07-27 16:43:09 +02:00
7 changed files with 195 additions and 29 deletions

100
Cargo.lock generated
View File

@ -111,6 +111,12 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]] [[package]]
name = "arrayvec" name = "arrayvec"
version = "0.7.2" version = "0.7.2"
@ -158,6 +164,18 @@ version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24a6904aef64d73cf10ab17ebace7befb918b82164785cb89907993be7f83813" checksum = "24a6904aef64d73cf10ab17ebace7befb918b82164785cb89907993be7f83813"
[[package]]
name = "bitvec"
version = "0.19.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55f93d0ef3363c364d5976646a38f04cf67cfe1d4c8d160cdea02cab2c116b33"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.10.4" version = "0.10.4"
@ -458,9 +476,11 @@ name = "domani"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"acme2", "acme2",
"bytes",
"clap", "clap",
"env_logger", "env_logger",
"futures", "futures",
"gemini",
"gix", "gix",
"handlebars", "handlebars",
"hex", "hex",
@ -660,6 +680,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.28"
@ -749,6 +775,18 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "gemini"
version = "0.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12a820f5a9ac6f433b34944dc8d17b759d5009275c8fe12f73b873153dbcd4e0"
dependencies = [
"nom 6.1.2",
"paste",
"thiserror",
"url",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.7" version = "0.14.7"
@ -828,7 +866,7 @@ dependencies = [
"btoi", "btoi",
"gix-date", "gix-date",
"itoa", "itoa",
"nom", "nom 7.1.3",
"thiserror", "thiserror",
] ]
@ -891,7 +929,7 @@ dependencies = [
"gix-sec", "gix-sec",
"log", "log",
"memchr", "memchr",
"nom", "nom 7.1.3",
"once_cell", "once_cell",
"smallvec", "smallvec",
"thiserror", "thiserror",
@ -1100,7 +1138,7 @@ dependencies = [
"gix-validate", "gix-validate",
"hex", "hex",
"itoa", "itoa",
"nom", "nom 7.1.3",
"smallvec", "smallvec",
"thiserror", "thiserror",
] ]
@ -1195,7 +1233,7 @@ dependencies = [
"gix-hash", "gix-hash",
"gix-transport", "gix-transport",
"maybe-async", "maybe-async",
"nom", "nom 7.1.3",
"thiserror", "thiserror",
] ]
@ -1226,7 +1264,7 @@ dependencies = [
"gix-tempfile", "gix-tempfile",
"gix-validate", "gix-validate",
"memmap2", "memmap2",
"nom", "nom 7.1.3",
"thiserror", "thiserror",
] ]
@ -1767,6 +1805,19 @@ version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec 0.5.2",
"bitflags 1.3.2",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.142" version = "0.2.142"
@ -1954,6 +2005,19 @@ dependencies = [
"smallvec", "smallvec",
] ]
[[package]]
name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.3" version = "7.1.3"
@ -2081,6 +2145,12 @@ dependencies = [
"windows-sys 0.45.0", "windows-sys 0.45.0",
] ]
[[package]]
name = "paste"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c"
[[package]] [[package]]
name = "pem" name = "pem"
version = "2.0.1" version = "2.0.1"
@ -2249,6 +2319,12 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]] [[package]]
name = "radix_trie" name = "radix_trie"
version = "0.2.1" version = "0.2.1"
@ -2805,6 +2881,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]] [[package]]
name = "tempdir" name = "tempdir"
version = "0.3.7" version = "0.3.7"
@ -3121,7 +3203,7 @@ version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "794a32261a1f5eb6a4462c81b59cec87b5c27d5deea7dd1ac8fc781c41d226db" checksum = "794a32261a1f5eb6a4462c81b59cec87b5c27d5deea7dd1ac8fc781c41d226db"
dependencies = [ dependencies = [
"arrayvec", "arrayvec 0.7.2",
] ]
[[package]] [[package]]
@ -3508,3 +3590,9 @@ checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"

View File

@ -31,7 +31,7 @@ mime_guess = "2.0.4"
hyper = { version = "0.14.26", features = [ "server", "stream" ]} hyper = { version = "0.14.26", features = [ "server", "stream" ]}
http = "0.2.9" http = "0.2.9"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
tokio-util = "0.7.8" tokio-util = { version = "0.7.8", features = [ "io" ]}
acme2 = "0.5.1" acme2 = "0.5.1"
openssl = "0.10.52" openssl = "0.10.52"
rustls = "0.21.1" rustls = "0.21.1"
@ -45,6 +45,8 @@ serde_yaml = "0.9.22"
rand = "0.8.5" rand = "0.8.5"
reqwest = "0.11.18" reqwest = "0.11.18"
hyper-reverse-proxy = "0.5.1" hyper-reverse-proxy = "0.5.1"
gemini = "0.0.5"
bytes = "1.4.0"
[patch.crates-io] [patch.crates-io]
tokio-rustls = { git = "https://code.betamike.com/micropelago/tokio-rustls.git", branch = "start-handshake-into-inner" } tokio-rustls = { git = "https://code.betamike.com/micropelago/tokio-rustls.git", branch = "start-handshake-into-inner" }

View File

@ -211,24 +211,39 @@ impl ManagerImpl {
manager manager
} }
async fn sync_origins_once(&self) {
match self.domain_store.all_domains() {
Ok(domains) => domains,
Err(err) => {
log::error!("Error fetching all domains: {err}");
return;
}
}
.into_iter()
.for_each(|domain| {
log::info!("Syncing domain {}", &domain);
let settings = match self.domain_store.get(&domain) {
Ok(settings) => settings,
Err(err) => {
log::error!("Error syncing {domain}: {err}");
return;
}
};
let descr = &settings.settings.origin_descr;
if let Err(err) = self.origin_store.sync(descr) {
log::error!("Failed to sync origin for {domain}, origin:{descr:?}: {err}")
}
});
}
async fn sync_origins(&self, canceller: CancellationToken) { async fn sync_origins(&self, canceller: CancellationToken) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60)); let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(20 * 60));
loop { loop {
tokio::select! { tokio::select! {
_ = interval.tick() => { _ = interval.tick() => self.sync_origins_once().await,
match self.origin_store.all_descrs() {
Ok(iter) => iter.into_iter(),
Err(err) => {
log::error!("Error fetching origin descriptors: {err}");
return;
}
}
.for_each(|descr| {
if let Err(err) = self.origin_store.sync(&descr) {
log::error!("Failed to sync store for {:?}: {err}", descr)
}
});
},
_ = canceller.cancelled() => return, _ = canceller.cancelled() => return,
} }
} }

View File

@ -151,6 +151,7 @@ async fn main() {
let _ = domani::service::gemini::Service::new( let _ = domani::service::gemini::Service::new(
&mut task_stack, &mut task_stack,
domain_manager.clone(),
sync::Arc::new(domain_gemini_store), sync::Arc::new(domain_gemini_store),
config.service, config.service,
); );

View File

@ -332,7 +332,7 @@ impl super::Store for FSStore {
// TODO this is very not ideal, the whole file is first read totally into memory, and then // TODO this is very not ideal, the whole file is first read totally into memory, and then
// that is cloned. // that is cloned.
let data = file_object.data.clone(); let data = bytes::Bytes::copy_from_slice(file_object.data.as_slice());
Ok(Box::pin(stream::once(async move { Ok(data) }))) Ok(Box::pin(stream::once(async move { Ok(data) })))
} }
} }

View File

@ -4,12 +4,13 @@ mod proxy;
pub use config::*; pub use config::*;
use crate::error::unexpected::{self, Mappable}; use crate::error::unexpected::{self, Mappable};
use crate::{domain, service, task_stack}; use crate::{domain, service, task_stack, util};
use std::sync; use std::sync;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub struct Service { pub struct Service {
domain_manager: sync::Arc<dyn domain::manager::Manager>,
cert_resolver: sync::Arc<dyn rustls::server::ResolvesServerCert>, cert_resolver: sync::Arc<dyn rustls::server::ResolvesServerCert>,
config: service::Config, config: service::Config,
} }
@ -26,10 +27,12 @@ enum HandleConnError {
impl Service { impl Service {
pub fn new( pub fn new(
task_stack: &mut task_stack::TaskStack<unexpected::Error>, task_stack: &mut task_stack::TaskStack<unexpected::Error>,
domain_manager: sync::Arc<dyn domain::manager::Manager>,
cert_resolver: sync::Arc<dyn rustls::server::ResolvesServerCert>, cert_resolver: sync::Arc<dyn rustls::server::ResolvesServerCert>,
config: service::Config, config: service::Config,
) -> sync::Arc<Service> { ) -> sync::Arc<Service> {
let service = sync::Arc::new(Service { let service = sync::Arc::new(Service {
domain_manager,
cert_resolver, cert_resolver,
config, config,
}); });
@ -37,6 +40,64 @@ impl Service {
service service
} }
async fn respond_conn<W>(
&self,
w: W,
code: &str,
meta: &str,
body: Option<util::BoxByteStream>,
) -> unexpected::Result<()>
where
W: tokio::io::AsyncWrite + Unpin,
{
use tokio::io::{copy, AsyncWriteExt, BufWriter};
let mut w = BufWriter::new(w);
w.write_all(code.as_bytes()).await.or_unexpected()?;
w.write_all(" ".as_bytes()).await.or_unexpected()?;
w.write_all(meta.as_bytes()).await.or_unexpected()?;
w.write_all("\r\n".as_bytes()).await.or_unexpected()?;
if let Some(body) = body {
let mut body = tokio_util::io::StreamReader::new(body);
copy(&mut body, &mut w).await.or_unexpected()?;
}
w.flush().await.or_unexpected()?;
Ok(())
}
async fn serve_conn<IO>(&self, domain: &domain::Name, conn: IO) -> Result<(), HandleConnError>
where
IO: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
use tokio::io::*;
let (r, w) = split(conn);
let mut r = BufReader::new(r);
let mut req = String::with_capacity(64);
r.read_line(&mut req)
.await
.map_err(|e| HandleConnError::ClientError(format!("failed to read request: {e}")))?;
let req = gemini::request::parse::request(req.as_bytes())
.map(|(_, req)| req)
.map_err(|e| HandleConnError::ClientError(format!("failed to parse request: {e}")))?
.into_gemini_request()
.map_err(|e| HandleConnError::ClientError(format!("failed to parse request: {e}")))?;
let f = match self.domain_manager.get_file(domain, req.path()) {
Ok(f) => f,
Err(domain::manager::GetFileError::DomainNotFound) => panic!("TODO"),
Err(domain::manager::GetFileError::FileNotFound) => panic!("TODO"),
Err(domain::manager::GetFileError::Unexpected(e)) => return Err(e.into()),
};
Ok(self.respond_conn(w, "20", "TODO", Some(f)).await?)
}
async fn proxy_conn<IO>( async fn proxy_conn<IO>(
&self, &self,
proxied_domain: &ConfigProxiedDomain, proxied_domain: &ConfigProxiedDomain,
@ -59,7 +120,7 @@ impl Service {
async fn handle_conn( async fn handle_conn(
&self, &self,
conn: tokio::net::TcpStream, conn: tokio::net::TcpStream,
_tls_config: sync::Arc<rustls::ServerConfig>, tls_config: sync::Arc<rustls::ServerConfig>,
) -> Result<(), HandleConnError> { ) -> Result<(), HandleConnError> {
let teed_conn = { let teed_conn = {
let (r, w) = tokio::io::split(conn); let (r, w) = tokio::io::split(conn);
@ -92,9 +153,8 @@ impl Service {
return Ok(()); return Ok(());
} }
return Err(HandleConnError::ClientError(format!( let conn = start.into_stream(tls_config).await.or_unexpected()?;
"unknown domain {domain}" self.serve_conn(&domain, conn).await
)));
} }
Err(err) => { Err(err) => {
return Err(unexpected::Error::from( return Err(unexpected::Error::from(
@ -113,7 +173,7 @@ async fn listen(
let tls_config = sync::Arc::new( let tls_config = sync::Arc::new(
rustls::server::ServerConfig::builder() rustls::server::ServerConfig::builder()
.with_safe_defaults() .with_safe_defaults()
.with_no_client_auth() // TODO maybe this isn't right? .with_no_client_auth()
.with_cert_resolver(service.cert_resolver.clone()), .with_cert_resolver(service.cert_resolver.clone()),
); );

View File

@ -10,6 +10,6 @@ pub fn open_file(path: &path::Path) -> io::Result<Option<fs::File>> {
} }
} }
pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<Vec<u8>>>; pub type BoxByteStream = futures::stream::BoxStream<'static, io::Result<bytes::Bytes>>;
pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>; pub type BoxFuture<'a, O> = pin::Pin<Box<dyn futures::Future<Output = O> + Send + 'a>>;