Implemented periodic origin syncing

This commit is contained in:
Brian Picciano 2023-05-15 21:18:33 +02:00
parent 8f1c0cce22
commit 6d68b4e5ab
5 changed files with 87 additions and 37 deletions

1
Cargo.lock generated
View File

@ -379,6 +379,7 @@ dependencies = [
"tempdir", "tempdir",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-util",
"trust-dns-client", "trust-dns-client",
] ]

View File

@ -31,3 +31,4 @@ mime_guess = "2.0.4"
hyper = { version = "0.14.26", features = [ "server" ]} hyper = { version = "0.14.26", features = [ "server" ]}
http = "0.2.9" http = "0.2.9"
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
tokio-util = "0.7.8"

View File

@ -2,7 +2,8 @@ use clap::Parser;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use signal_hook::consts::signal; use signal_hook::consts::signal;
use signal_hook_tokio::Signals; use signal_hook_tokio::Signals;
use tokio::sync::oneshot; use tokio::select;
use tokio::time;
use std::convert::Infallible; use std::convert::Infallible;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -10,6 +11,8 @@ use std::path;
use std::str::FromStr; use std::str::FromStr;
use std::sync; use std::sync;
use domiply::origin::store::Store;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version)] #[command(version)]
#[command(about = "A domiply to another dimension")] #[command(about = "A domiply to another dimension")]
@ -46,28 +49,70 @@ fn main() {
.unwrap(), .unwrap(),
); );
let (stop_ch_tx, stop_ch_rx) = tokio_runtime.block_on(async { oneshot::channel() }); let canceller = tokio_runtime.block_on(async { tokio_util::sync::CancellationToken::new() });
// set up signal handling, stop_ch_rx will be used to signal that the stop signal has been {
// received let canceller = canceller.clone();
tokio_runtime.spawn(async move { tokio_runtime.spawn(async move {
let mut signals = Signals::new(&[signal::SIGTERM, signal::SIGINT, signal::SIGQUIT]) let mut signals = Signals::new(&[signal::SIGTERM, signal::SIGINT, signal::SIGQUIT])
.expect("initialized signals"); .expect("initialized signals");
if let Some(_) = signals.next().await { if let Some(_) = signals.next().await {
println!("Gracefully shutting down..."); println!("Gracefully shutting down...");
let _ = stop_ch_tx.send(()); canceller.cancel();
} }
if let Some(_) = signals.next().await { if let Some(_) = signals.next().await {
println!("Forcefully shutting down"); println!("Forcefully shutting down");
std::process::exit(1); std::process::exit(1);
}; };
}); });
}
let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path) let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path)
.expect("git origin store initialized"); .expect("git origin store initialized");
let origin_syncer_handler = {
let origin_store = origin_store.clone();
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(20 * 60));
interval.tick().await;
loop {
origin_store
.all_descrs()
.expect("got all_descrs iter")
.into_iter()
.for_each(|descr| {
if canceller.is_cancelled() {
return;
}
if let Err(err) = descr {
println!("failed iterating origins: {err}");
return;
}
let descr = descr.unwrap();
println!("syncing origin: {descr:?}");
if let Err(err) =
origin_store.sync(descr.clone(), domiply::origin::store::Limits {})
{
println!("error syncing origin {descr:?}: {err}");
}
});
select! {
_ = interval.tick() => continue,
_ = canceller.cancelled() => return,
}
}
})
};
let domain_checker = domiply::domain::checker::new( let domain_checker = domiply::domain::checker::new(
tokio_runtime.clone(), tokio_runtime.clone(),
config.domain_checker_target_aaaa, config.domain_checker_target_aaaa,
@ -103,20 +148,27 @@ fn main() {
async move { Ok::<_, Infallible>(service) } async move { Ok::<_, Infallible>(service) }
}); });
tokio_runtime.block_on(async { let server_handler = {
let addr = config.http_listen_addr; let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let addr = config.http_listen_addr;
println!("Listening on {addr}"); println!("Listening on {addr}");
let server = hyper::Server::bind(&addr).serve(make_service); let server = hyper::Server::bind(&addr).serve(make_service);
let graceful = server.with_graceful_shutdown(async { let graceful = server.with_graceful_shutdown(async {
stop_ch_rx.await.ok(); canceller.cancelled().await;
}); });
if let Err(e) = graceful.await { if let Err(e) = graceful.await {
panic!("server error: {}", e); panic!("server error: {}", e);
} };
}); })
};
tokio_runtime
.block_on(async { futures::try_join!(origin_syncer_handler, server_handler) })
.unwrap();
println!("Graceful shutdown complete"); println!("Graceful shutdown complete");
} }

View File

@ -41,17 +41,13 @@ pub enum AllDescrsError {
/// Used in the return from all_descrs from Store. /// Used in the return from all_descrs from Store.
pub type AllDescrsResult<T> = Result<T, AllDescrsError>; pub type AllDescrsResult<T> = Result<T, AllDescrsError>;
#[mockall::automock(
type Origin=origin::MockOrigin;
type AllDescrsIter=Vec<AllDescrsResult<origin::Descr>>;
)]
/// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr. /// Describes a storage mechanism for Origins. Each Origin is uniquely identified by its Descr.
pub trait Store: std::marker::Send + std::marker::Sync { pub trait Store: Send + Sync + Clone {
type Origin<'store>: origin::Origin + 'store type Origin<'store>: origin::Origin + 'store
where where
Self: 'store; Self: 'store;
type AllDescrsIter<'store>: IntoIterator<Item = AllDescrsResult<origin::Descr>> + 'store type AllDescrsIter<'store>: IntoIterator<Item = AllDescrsResult<origin::Descr>> + Send + 'store
where where
Self: 'store; Self: 'store;

View File

@ -67,11 +67,11 @@ struct Store {
pub fn new(dir_path: PathBuf) -> io::Result<impl super::Store> { pub fn new(dir_path: PathBuf) -> io::Result<impl super::Store> {
fs::create_dir_all(&dir_path)?; fs::create_dir_all(&dir_path)?;
Ok(Store { Ok(sync::Arc::new(Store {
dir_path, dir_path,
sync_guard: sync::Mutex::new(collections::HashMap::new()), sync_guard: sync::Mutex::new(collections::HashMap::new()),
origins: sync::RwLock::new(collections::HashMap::new()), origins: sync::RwLock::new(collections::HashMap::new()),
}) }))
} }
impl Store { impl Store {
@ -191,11 +191,11 @@ impl Store {
} }
} }
impl super::Store for Store { impl super::Store for sync::Arc<Store> {
type Origin<'store> = sync::Arc<Origin> type Origin<'store> = sync::Arc<Origin>
where Self: 'store; where Self: 'store;
type AllDescrsIter<'store> = Box<dyn Iterator<Item = store::AllDescrsResult<origin::Descr>> + 'store> type AllDescrsIter<'store> = Box<dyn Iterator<Item = store::AllDescrsResult<origin::Descr>> + Send + 'store>
where Self: 'store; where Self: 'store;
fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> { fn sync(&self, descr: origin::Descr, limits: store::Limits) -> Result<(), store::SyncError> {