domani/src/main.rs

175 lines
5.5 KiB
Rust
Raw Normal View History

use clap::Parser;
use futures::stream::StreamExt;
use signal_hook::consts::signal;
use signal_hook_tokio::Signals;
2023-05-15 19:18:33 +00:00
use tokio::select;
use tokio::time;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path;
use std::str::FromStr;
use std::sync;
2023-05-15 19:18:33 +00:00
use domiply::origin::store::Store;
#[derive(Parser, Debug)]
#[command(version)]
2023-05-13 14:39:54 +00:00
#[command(about = "A domiply to another dimension")]
struct Cli {
2023-05-15 16:23:53 +00:00
#[arg(long, default_value_t = SocketAddr::from_str("[::]:3030").unwrap(), env = "DOMIPLY_HTTP_LISTEN_ADDR")]
http_listen_addr: SocketAddr,
2023-05-15 18:25:07 +00:00
#[arg(long, required = true, env = "DOMIPLY_HTTP_DOMAIN")]
http_domain: String,
2023-05-13 14:39:54 +00:00
#[arg(long, required = true, env = "DOMIPLY_PASSPHRASE")]
passphrase: String,
2023-05-13 14:39:54 +00:00
#[arg(long, required = true, env = "DOMIPLY_ORIGIN_STORE_GIT_DIR_PATH")]
origin_store_git_dir_path: path::PathBuf,
2023-05-15 20:16:29 +00:00
#[arg(long, required = true, env = "DOMIPLY_DOMAIN_CHECKER_TARGET_A")]
domain_checker_target_a: std::net::Ipv4Addr,
2023-05-13 14:39:54 +00:00
#[arg(long, default_value_t = String::from("1.1.1.1:53"), env = "DOMIPLY_DOMAIN_CHECKER_RESOLVER_ADDR")]
domain_checker_resolver_addr: String,
2023-05-13 14:39:54 +00:00
#[arg(long, required = true, env = "DOMIPLY_DOMAIN_CONFIG_STORE_DIR_PATH")]
domain_config_store_dir_path: path::PathBuf,
}
fn main() {
let config = Cli::parse();
let tokio_runtime = std::sync::Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
);
2023-05-15 19:18:33 +00:00
let canceller = tokio_runtime.block_on(async { tokio_util::sync::CancellationToken::new() });
2023-05-15 19:18:33 +00:00
{
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let mut signals = Signals::new(&[signal::SIGTERM, signal::SIGINT, signal::SIGQUIT])
.expect("initialized signals");
2023-05-15 19:18:33 +00:00
if let Some(_) = signals.next().await {
println!("Gracefully shutting down...");
canceller.cancel();
}
2023-05-15 19:18:33 +00:00
if let Some(_) = signals.next().await {
println!("Forcefully shutting down");
std::process::exit(1);
};
});
}
2023-05-13 14:39:54 +00:00
let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path)
.expect("git origin store initialized");
2023-05-15 19:18:33 +00:00
let origin_syncer_handler = {
let origin_store = origin_store.clone();
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(20 * 60));
interval.tick().await;
loop {
origin_store
.all_descrs()
.expect("got all_descrs iter")
.into_iter()
.for_each(|descr| {
if canceller.is_cancelled() {
return;
}
if let Err(err) = descr {
println!("failed iterating origins: {err}");
return;
}
let descr = descr.unwrap();
println!("syncing origin: {descr:?}");
if let Err(err) =
origin_store.sync(descr.clone(), domiply::origin::store::Limits {})
{
println!("error syncing origin {descr:?}: {err}");
}
});
select! {
_ = interval.tick() => continue,
_ = canceller.cancelled() => return,
}
}
})
};
2023-05-13 14:39:54 +00:00
let domain_checker = domiply::domain::checker::new(
tokio_runtime.clone(),
2023-05-15 20:16:29 +00:00
config.domain_checker_target_a,
&config.domain_checker_resolver_addr,
)
.expect("domain checker initialized");
2023-05-13 14:39:54 +00:00
let domain_config_store = domiply::domain::config::new(&config.domain_config_store_dir_path)
.expect("domain config store initialized");
2023-05-13 14:39:54 +00:00
let manager = domiply::domain::manager::new(origin_store, domain_config_store, domain_checker);
let manager = sync::Arc::new(manager);
2023-05-13 14:39:54 +00:00
let service = domiply::service::new(
manager,
2023-05-15 20:16:29 +00:00
config.domain_checker_target_a,
config.passphrase,
2023-05-15 20:16:29 +00:00
config.http_domain.clone(),
);
let service = sync::Arc::new(service);
let make_service =
hyper::service::make_service_fn(move |_conn: &hyper::server::conn::AddrStream| {
let service = service.clone();
// Create a `Service` for responding to the request.
let service = hyper::service::service_fn(move |req| {
domiply::service::handle_request(service.clone(), req)
});
// Return the service to hyper.
async move { Ok::<_, Infallible>(service) }
});
2023-05-15 19:18:33 +00:00
let server_handler = {
let canceller = canceller.clone();
tokio_runtime.spawn(async move {
let addr = config.http_listen_addr;
2023-05-15 20:16:29 +00:00
println!("Listening on http://{}:{}", config.http_domain, addr.port());
2023-05-15 19:18:33 +00:00
let server = hyper::Server::bind(&addr).serve(make_service);
2023-05-15 19:18:33 +00:00
let graceful = server.with_graceful_shutdown(async {
canceller.cancelled().await;
});
if let Err(e) = graceful.await {
panic!("server error: {}", e);
};
})
};
2023-05-15 19:18:33 +00:00
tokio_runtime
.block_on(async { futures::try_join!(origin_syncer_handler, server_handler) })
.unwrap();
println!("Graceful shutdown complete");
}