Domani connects your domain to whatever you want to host on it, all with no account needed
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
domani/src/service/http/tasks.rs

173 lines
6.1 KiB

use crate::error::unexpected::{self, Mappable};
use crate::service;
use std::{net, pin, sync};
use core::time::Duration;
use tokio_util::sync::CancellationToken;
async fn serve_conn<Conn>(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
conn: Conn,
remote_addr: net::SocketAddr,
req_is_https: bool,
) where
Conn: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + 'static,
{
log::debug!("[{remote_addr}] Handling incoming connection (is_https:{req_is_https})");
let service = crate::service::http::HyperServiceImpl::new(
service.clone(),
remote_addr.ip(),
req_is_https,
);
use hyper_util::{
rt::tokio::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder,
};
let timer = TokioTimer::new();
let mut builder = Builder::new(TokioExecutor::new());
builder
.http1()
.timer(timer.clone())
.header_read_timeout(Duration::from_secs(5))
.http2()
.timer(timer)
.keep_alive_interval(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(5));
let mut conn = pin::pin!(builder.serve_connection(TokioIo::new(conn), service));
tokio::select! {
res = conn.as_mut() => {
if let Err(err) = res {
log::error!("[{remote_addr}] Failed to serve HTTP(S) request: {err}")
} else {
log::debug!("[{remote_addr}] Finished handling connection");
}
return
},
_ = canceller.cancelled() => (),
}
log::debug!("[{remote_addr}] received cancellation notice, gracefully shutting down...");
conn.as_mut().graceful_shutdown();
let timeout_res = tokio::time::timeout(Duration::from_secs(5), async {
if let Err(err) = conn.await {
log::error!("[{remote_addr}] Failed to serve HTTP(S) request after shutdown: {err}")
} else {
log::debug!("[{remote_addr}] Finished handling connection after shutdown");
}
})
.await;
if timeout_res.is_err() {
log::debug!("[{remote_addr}] did not gracefully shutdown, forcing shutdown")
}
}
pub async fn listen_http(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
addr: net::SocketAddr,
) -> unexpected::Result<()> {
let mut wg = awaitgroup::WaitGroup::new();
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_unexpected_while(|| format!("creating TCP listener on {addr}"))?;
log::info!("Listening on http://{}", &addr);
loop {
tokio::select! {
accept_res = listener.accept() => {
match accept_res {
Ok((stream, remote_addr)) => {
let worker = wg.worker();
let service = service.clone();
let canceller = canceller.clone();
tokio::task::spawn(async move {
serve_conn(service, canceller, stream, remote_addr, false).await;
worker.done()
});
},
Err(err) => {
log::error!("Failed to accept new HTTP request on {addr}: {err}");
tokio::time::sleep(Duration::from_secs(2)).await;
},
}
},
_ = canceller.cancelled() => {
log::info!("No longer accepting new requests on http://{addr}, waiting on remaining requests...");
wg.wait().await;
log::info!("All requests on http://{addr} have completed");
return Ok(())
}
}
}
}
pub async fn listen_https(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
) -> Result<(), unexpected::Error> {
let mut wg = awaitgroup::WaitGroup::new();
let cert_resolver = service.cert_resolver.clone();
let addr = service.config.http.https_addr.unwrap();
let server_config: tokio_rustls::TlsAcceptor = sync::Arc::new(
rustls::server::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(cert_resolver),
)
.into();
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_unexpected_while(|| format!("creating TCP listener on {addr}"))?;
log::info!("Listening on https://{}", &addr);
loop {
tokio::select! {
accept_res = listener.accept() => {
match accept_res {
Ok((raw_stream, remote_addr)) => {
let worker = wg.worker();
let server_config = server_config.clone();
let service = service.clone();
let canceller = canceller.clone();
tokio::task::spawn(async move {
let stream = match server_config.accept(raw_stream).await {
Ok(s) => s,
Err(err) => {
log::warn!("Failed to accept TLS connection on {addr}: {err}");
worker.done();
return;
}
};
serve_conn(service, canceller, stream, remote_addr, true).await;
worker.done();
});
},
Err(err) => {
log::error!("Failed to accept new HTTPS request on {addr}: {err}");
tokio::time::sleep(Duration::from_secs(2)).await;
},
}
},
_ = canceller.cancelled() => {
log::info!("No longer accepting new requests on https://{addr}, waiting on remaining requests...");
wg.wait().await;
log::info!("All requests on https://{addr} have completed");
return Ok(())
}
}
}
}