use a FuturesOrdered wait group
This commit is contained in:
parent
4e412d0677
commit
4cd5234519
@ -80,6 +80,7 @@ where
|
|||||||
where Self: 'mgr;
|
where Self: 'mgr;
|
||||||
|
|
||||||
fn sync_domain(&self, domain: domain::Name) -> Self::SyncDomainFuture<'_> {
|
fn sync_domain(&self, domain: domain::Name) -> Self::SyncDomainFuture<'_> {
|
||||||
|
Box::pin(async move {
|
||||||
// if there's an existing cert, and its expiry (determined by the soonest value of
|
// if there's an existing cert, and its expiry (determined by the soonest value of
|
||||||
// not_after amongst its parts) is later than 30 days from now, then we consider it to be
|
// not_after amongst its parts) is later than 30 days from now, then we consider it to be
|
||||||
// synced.
|
// synced.
|
||||||
@ -87,19 +88,19 @@ where
|
|||||||
let thirty_days = openssl::asn1::Asn1Time::days_from_now(30)
|
let thirty_days = openssl::asn1::Asn1Time::days_from_now(30)
|
||||||
.expect("parsed thirty days from now as Asn1Time");
|
.expect("parsed thirty days from now as Asn1Time");
|
||||||
|
|
||||||
let soonest_not_after = cert[1..]
|
let cert_with_soonest_not_after = cert
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|cert_part| cert_part.not_after())
|
.reduce(|a, b| if a.not_after() < b.not_after() { a } else { b })
|
||||||
.fold(cert[0].not_after(), |a, b| if a < b { a } else { b });
|
.ok_or(error::Unexpected::from(
|
||||||
|
"expected there to be more than one cert",
|
||||||
|
))?;
|
||||||
|
|
||||||
if thirty_days < soonest_not_after {
|
if thirty_days < cert_with_soonest_not_after.not_after() {
|
||||||
return Box::pin(future::ready(Ok(())));
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("fetching a new certificate for domain {}", domain.as_str());
|
println!("fetching a new certificate for domain {}", domain.as_str());
|
||||||
|
|
||||||
Box::pin(async move {
|
|
||||||
let mut builder = acme2::OrderBuilder::new(self.account.clone());
|
let mut builder = acme2::OrderBuilder::new(self.account.clone());
|
||||||
builder.add_dns_identifier(domain.as_str().to_string());
|
builder.add_dns_identifier(domain.as_str().to_string());
|
||||||
let order = builder.build().await.map_unexpected()?;
|
let order = builder.build().await.map_unexpected()?;
|
||||||
|
21
src/main.rs
21
src/main.rs
@ -1,6 +1,7 @@
|
|||||||
#![feature(result_option_inspect)]
|
#![feature(result_option_inspect)]
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
use futures::stream::futures_unordered::FuturesUnordered;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use signal_hook_tokio::Signals;
|
use signal_hook_tokio::Signals;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
@ -58,6 +59,8 @@ struct Cli {
|
|||||||
fn main() {
|
fn main() {
|
||||||
let config = Cli::parse();
|
let config = Cli::parse();
|
||||||
|
|
||||||
|
let mut wait_group = FuturesUnordered::new();
|
||||||
|
|
||||||
let tokio_runtime = std::sync::Arc::new(
|
let tokio_runtime = std::sync::Arc::new(
|
||||||
tokio::runtime::Builder::new_multi_thread()
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
@ -123,7 +126,7 @@ fn main() {
|
|||||||
domain_acme_manager.clone(),
|
domain_acme_manager.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let origin_syncer_handler = {
|
wait_group.push({
|
||||||
let manager = manager.clone();
|
let manager = manager.clone();
|
||||||
let canceller = canceller.clone();
|
let canceller = canceller.clone();
|
||||||
|
|
||||||
@ -152,7 +155,7 @@ fn main() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
};
|
});
|
||||||
|
|
||||||
let service = domiply::service::new(
|
let service = domiply::service::new(
|
||||||
manager,
|
manager,
|
||||||
@ -176,7 +179,7 @@ fn main() {
|
|||||||
async move { Ok::<_, Infallible>(service) }
|
async move { Ok::<_, Infallible>(service) }
|
||||||
});
|
});
|
||||||
|
|
||||||
let server_handler = {
|
wait_group.push({
|
||||||
let http_domain = config.http_domain.clone();
|
let http_domain = config.http_domain.clone();
|
||||||
|
|
||||||
tokio_runtime.spawn(async move {
|
tokio_runtime.spawn(async move {
|
||||||
@ -193,7 +196,7 @@ fn main() {
|
|||||||
panic!("server error: {}", e);
|
panic!("server error: {}", e);
|
||||||
};
|
};
|
||||||
})
|
})
|
||||||
};
|
});
|
||||||
|
|
||||||
// if there's an acme manager then it means that https is enabled, and we should ensure that
|
// if there's an acme manager then it means that https is enabled, and we should ensure that
|
||||||
// the http domain for domiply itself has a valid certificate.
|
// the http domain for domiply itself has a valid certificate.
|
||||||
@ -201,19 +204,17 @@ fn main() {
|
|||||||
let domain = domiply::domain::Name::from_str(&config.http_domain)
|
let domain = domiply::domain::Name::from_str(&config.http_domain)
|
||||||
.expect("--http-domain parses as a domain");
|
.expect("--http-domain parses as a domain");
|
||||||
|
|
||||||
tokio_runtime.spawn(async move {
|
wait_group.push(tokio_runtime.spawn(async move {
|
||||||
_ = domain_acme_manager
|
_ = domain_acme_manager
|
||||||
.sync_domain(domain.clone())
|
.sync_domain(domain.clone())
|
||||||
.await
|
.await
|
||||||
.inspect_err(|err| {
|
.inspect_err(|err| {
|
||||||
println!("Error while getting cert for {}: {err}", domain.as_str())
|
panic!("Error while getting cert for {}: {err}", domain.as_str())
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio_runtime
|
tokio_runtime.block_on(async { while let Some(_) = wait_group.next().await {} });
|
||||||
.block_on(async { futures::try_join!(origin_syncer_handler, server_handler) })
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
println!("Graceful shutdown complete");
|
println!("Graceful shutdown complete");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user