use clap::Parser; use futures::stream::StreamExt; use signal_hook::consts::signal; use signal_hook_tokio::Signals; use tokio::select; use tokio::time; use std::convert::Infallible; use std::net::SocketAddr; use std::path; use std::str::FromStr; use std::sync; use domiply::origin::store::Store; #[derive(Parser, Debug)] #[command(version)] #[command(about = "A domiply to another dimension")] struct Cli { #[arg(long, default_value_t = SocketAddr::from_str("[::]:3030").unwrap(), env = "DOMIPLY_HTTP_LISTEN_ADDR")] http_listen_addr: SocketAddr, #[arg(long, required = true, env = "DOMIPLY_HTTP_DOMAIN")] http_domain: String, #[arg(long, required = true, env = "DOMIPLY_PASSPHRASE")] passphrase: String, #[arg(long, required = true, env = "DOMIPLY_ORIGIN_STORE_GIT_DIR_PATH")] origin_store_git_dir_path: path::PathBuf, #[arg(long, required = true, env = "DOMIPLY_DOMAIN_CHECKER_TARGET_AAAA")] domain_checker_target_aaaa: std::net::Ipv6Addr, #[arg(long, default_value_t = String::from("1.1.1.1:53"), env = "DOMIPLY_DOMAIN_CHECKER_RESOLVER_ADDR")] domain_checker_resolver_addr: String, #[arg(long, required = true, env = "DOMIPLY_DOMAIN_CONFIG_STORE_DIR_PATH")] domain_config_store_dir_path: path::PathBuf, } fn main() { let config = Cli::parse(); let tokio_runtime = std::sync::Arc::new( tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(), ); let canceller = tokio_runtime.block_on(async { tokio_util::sync::CancellationToken::new() }); { let canceller = canceller.clone(); tokio_runtime.spawn(async move { let mut signals = Signals::new(&[signal::SIGTERM, signal::SIGINT, signal::SIGQUIT]) .expect("initialized signals"); if let Some(_) = signals.next().await { println!("Gracefully shutting down..."); canceller.cancel(); } if let Some(_) = signals.next().await { println!("Forcefully shutting down"); std::process::exit(1); }; }); } let origin_store = domiply::origin::store::git::new(config.origin_store_git_dir_path) .expect("git origin store initialized"); let origin_syncer_handler = { let origin_store = origin_store.clone(); let canceller = canceller.clone(); tokio_runtime.spawn(async move { let mut interval = time::interval(time::Duration::from_secs(20 * 60)); interval.tick().await; loop { origin_store .all_descrs() .expect("got all_descrs iter") .into_iter() .for_each(|descr| { if canceller.is_cancelled() { return; } if let Err(err) = descr { println!("failed iterating origins: {err}"); return; } let descr = descr.unwrap(); println!("syncing origin: {descr:?}"); if let Err(err) = origin_store.sync(descr.clone(), domiply::origin::store::Limits {}) { println!("error syncing origin {descr:?}: {err}"); } }); select! { _ = interval.tick() => continue, _ = canceller.cancelled() => return, } } }) }; let domain_checker = domiply::domain::checker::new( tokio_runtime.clone(), config.domain_checker_target_aaaa, &config.domain_checker_resolver_addr, ) .expect("domain checker initialized"); let domain_config_store = domiply::domain::config::new(&config.domain_config_store_dir_path) .expect("domain config store initialized"); let manager = domiply::domain::manager::new(origin_store, domain_config_store, domain_checker); let manager = sync::Arc::new(manager); let service = domiply::service::new( manager, config.domain_checker_target_aaaa, config.passphrase, config.http_domain, ); let service = sync::Arc::new(service); let make_service = hyper::service::make_service_fn(move |_conn: &hyper::server::conn::AddrStream| { let service = service.clone(); // Create a `Service` for responding to the request. let service = hyper::service::service_fn(move |req| { domiply::service::handle_request(service.clone(), req) }); // Return the service to hyper. async move { Ok::<_, Infallible>(service) } }); let server_handler = { let canceller = canceller.clone(); tokio_runtime.spawn(async move { let addr = config.http_listen_addr; println!("Listening on {addr}"); let server = hyper::Server::bind(&addr).serve(make_service); let graceful = server.with_graceful_shutdown(async { canceller.cancelled().await; }); if let Err(e) = graceful.await { panic!("server error: {}", e); }; }) }; tokio_runtime .block_on(async { futures::try_join!(origin_syncer_handler, server_handler) }) .unwrap(); println!("Graceful shutdown complete"); }