Upgrade to hyper v1.2.0, potentially fix issues with graceful shutdown

main
Brian Picciano 2 months ago
parent f562b32edb
commit c76d7318f0
  1. 294
      Cargo.lock
  2. 15
      Cargo.toml
  3. 164
      src/service/http.rs
  4. 16
      src/service/http/proxy.rs
  5. 204
      src/service/http/tasks.rs

294
Cargo.lock generated

@ -9,7 +9,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "453e534d4f46dcdddd7aa8619e9a664e153f34383d14710db0b0d76c2964db89"
dependencies = [
"base64 0.13.1",
"hyper",
"hyper 0.14.26",
"openssl",
"reqwest",
"serde",
@ -138,6 +138,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "awaitgroup"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a872ceb3db05a391fbe7cf8eba07a1239b2d946eee66f9e942be9bff06206302"
[[package]]
name = "backtrace"
version = "0.3.69"
@ -401,9 +407,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.6"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
@ -415,6 +421,7 @@ version = "0.1.0"
dependencies = [
"acme2",
"async-compression",
"awaitgroup",
"bytes",
"clap",
"env_logger",
@ -424,10 +431,12 @@ dependencies = [
"gix-object",
"handlebars",
"hex",
"http",
"hyper",
"http 1.0.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.2.0",
"hyper-reverse-proxy",
"hyper-trust-dns",
"hyper-util",
"log",
"mime_guess",
"mockall",
@ -436,7 +445,7 @@ dependencies = [
"rand 0.8.5",
"reqwest",
"rust-embed",
"rustls 0.21.10",
"rustls",
"serde",
"serde_json",
"serde_urlencoded",
@ -446,9 +455,8 @@ dependencies = [
"signal-hook",
"tempdir",
"thiserror",
"tls-listener",
"tokio",
"tokio-rustls 0.24.1",
"tokio-rustls",
"tokio-util",
"trust-dns-client",
]
@ -840,7 +848,7 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http",
"http 0.2.9",
"indexmap 1.9.3",
"slab",
"tokio",
@ -848,6 +856,25 @@ dependencies = [
"tracing",
]
[[package]]
name = "h2"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 1.0.0",
"indexmap 2.0.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "handlebars"
version = "4.3.7"
@ -913,6 +940,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
@ -920,7 +958,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes",
"http",
"http 0.2.9",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
dependencies = [
"bytes",
"http 1.0.0",
]
[[package]]
name = "http-body-util"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840"
dependencies = [
"bytes",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"pin-project-lite",
]
@ -952,9 +1013,9 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"h2 0.3.18",
"http 0.2.9",
"http-body 0.4.5",
"httparse",
"httpdate",
"itoa",
@ -967,28 +1028,36 @@ dependencies = [
]
[[package]]
name = "hyper-reverse-proxy"
version = "0.5.2-dev"
source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?branch=dont-set-host-header#9f4b94724f9b164d2e2d08607780d5e85f53368e"
name = "hyper"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a"
dependencies = [
"hyper",
"lazy_static",
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.2",
"http 1.0.0",
"http-body 1.0.0",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"smallvec",
"tokio",
"tracing",
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.23.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
name = "hyper-reverse-proxy"
version = "0.5.2-dev"
source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?branch=master#1dc4618994a5e9bc5de2083b911b1b08da7c081f"
dependencies = [
"http",
"hyper",
"rustls 0.20.9",
"hyper 1.2.0",
"hyper-util",
"lazy_static",
"tokio",
"tokio-rustls 0.23.4",
"webpki-roots 0.22.6",
"tracing",
]
[[package]]
@ -998,11 +1067,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls 0.21.10",
"http 0.2.9",
"hyper 0.14.26",
"rustls",
"tokio",
"tokio-rustls 0.24.1",
"tokio-rustls",
]
[[package]]
@ -1012,22 +1081,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"hyper 0.14.26",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-trust-dns"
version = "0.5.0"
name = "hyper-util"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0deaf08b5c5409c0c74011f696a82bdadae4c6d70b7a71edf8378b29bdd840bd"
checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa"
dependencies = [
"hyper",
"hyper-rustls 0.23.2",
"bytes",
"futures-channel",
"futures-util",
"http 1.0.0",
"http-body 1.0.0",
"hyper 1.2.0",
"pin-project-lite",
"socket2 0.5.5",
"tokio",
"trust-dns-resolver",
"tower",
"tower-service",
"tracing",
]
[[package]]
@ -1188,12 +1265,6 @@ version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "linux-raw-sys"
version = "0.3.6"
@ -1216,15 +1287,6 @@ version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4"
[[package]]
name = "lru-cache"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c"
dependencies = [
"linked-hash-map",
]
[[package]]
name = "matches"
version = "0.1.10"
@ -1781,11 +1843,11 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-rustls 0.24.1",
"h2 0.3.18",
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
@ -1795,7 +1857,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.10",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
@ -1803,7 +1865,7 @@ dependencies = [
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls 0.24.1",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
@ -1811,7 +1873,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots 0.25.3",
"webpki-roots",
"winreg",
]
@ -1898,17 +1960,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "rustls"
version = "0.20.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99"
dependencies = [
"ring 0.16.20",
"sct",
"webpki",
]
[[package]]
name = "rustls"
version = "0.21.10"
@ -2095,9 +2146,9 @@ checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha2"
version = "0.10.6"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
@ -2134,9 +2185,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.10.0"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "socket2"
@ -2335,20 +2386,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tls-listener"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81294c017957a1a69794f506723519255879e15a870507faf45dfed288b763dd"
dependencies = [
"futures-util",
"hyper",
"pin-project-lite",
"thiserror",
"tokio",
"tokio-rustls 0.24.1",
]
[[package]]
name = "tokio"
version = "1.34.0"
@ -2389,23 +2426,12 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.9",
"tokio",
"webpki",
]
[[package]]
name = "tokio-rustls"
version = "0.24.1"
source = "git+https://code.betamike.com/micropelago/tokio-rustls.git?branch=start-handshake-into-inner#3d462a1d97836cdb0600f0bc69c5e3b3310f6d8c"
dependencies = [
"rustls 0.21.10",
"rustls",
"tokio",
]
@ -2423,6 +2449,28 @@ dependencies = [
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.2"
@ -2436,6 +2484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@ -2516,24 +2565,6 @@ dependencies = [
"url",
]
[[package]]
name = "trust-dns-resolver"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe"
dependencies = [
"cfg-if",
"futures-util",
"lazy_static",
"lru-cache",
"parking_lot",
"smallvec",
"thiserror",
"tokio",
"tracing",
"trust-dns-proto",
]
[[package]]
name = "try-lock"
version = "0.2.4"
@ -2744,25 +2775,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]]
name = "webpki-roots"
version = "0.25.3"

@ -23,8 +23,8 @@ clap = { version = "4.2.7", features = ["derive", "env"] }
handlebars = { version = "4.3.7", features = [ "rust-embed" ]}
rust-embed = "6.6.1"
mime_guess = "2.0.4"
hyper = { version = "0.14.26", features = [ "server", "stream" ]}
http = "0.2.9"
hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2"] }
http = "1.0.0"
serde_urlencoded = "0.7.1"
tokio-util = { version = "0.7.8", features = [ "io" ]}
acme2 = "0.5.1"
@ -32,7 +32,6 @@ openssl = "0.10.52"
rustls = "0.21.1"
pem = "2.0.1"
serde_with = "3.0.0"
tls-listener = { version = "0.7.0", features = [ "rustls", "hyper-h1" ]}
tokio-rustls = "0.24.0"
log = "0.4.19"
env_logger = "0.10.0"
@ -41,14 +40,20 @@ rand = "0.8.5"
hyper-reverse-proxy = "0.5.2-dev"
gemini = "0.0.5"
bytes = "1.4.0"
hyper-trust-dns = "0.5.0"
gix-hash = "0.14.1"
reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] }
gix-object = "0.41.0"
async-compression = { version = "0.4.6", features = ["tokio", "deflate", "zlib"] }
hyper-util = { version = "0.1.3", features = ["server", "http1", "http2", "tokio"] }
http-body-util = "0.1.0"
http-body = "1.0.0"
awaitgroup = "0.7.0"
[patch.crates-io]
# The micropelago fork of tokio-rustls allows for gemini proxying
tokio-rustls = { git = "https://code.betamike.com/micropelago/tokio-rustls.git", branch = "start-handshake-into-inner" }
hyper-reverse-proxy = { git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", branch = "dont-set-host-header" }
# The micropelago fork of hyper-reverse-proxy supports hyper v1, and fixes some
# bugs from upstream (which appears to be unmaintained).
hyper-reverse-proxy = { git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", branch = "master" }

@ -6,15 +6,39 @@ mod util;
pub use config::*;
use http::request::Parts;
use hyper::{Body, Method, Request, Response};
use serde::{Deserialize, Serialize};
use std::{future, net, sync};
use std::{future, io, net, sync};
use crate::error::unexpected::{self, Mappable};
use crate::{domain, service, task_stack};
use http_body_util::combinators::UnsyncBoxBody as BoxBody;
use hyper::{
body::{Body, Incoming as RequestBody},
Method,
};
use serde::{Deserialize, Serialize};
type Request = hyper::Request<RequestBody>;
pub type Response = hyper::Response<BoxBody<bytes::Bytes, io::Error>>;
fn bytes_body(b: bytes::Bytes) -> impl Body<Data = bytes::Bytes, Error = io::Error> {
use http_body_util::BodyExt;
http_body_util::Full::new(b).map_err(io::Error::other)
}
fn empty_body() -> impl Body<Data = bytes::Bytes, Error = io::Error> {
use http_body_util::BodyExt;
http_body_util::Empty::<bytes::Bytes>::new().map_err(io::Error::other)
}
fn stream_body<S>(s: S) -> impl Body<Data = bytes::Bytes, Error = io::Error>
where
S: futures::stream::Stream<Item = io::Result<bytes::Bytes>>,
{
use futures::stream::TryStreamExt;
http_body_util::StreamBody::new(s.map_ok(http_body::Frame::data))
}
#[derive(Serialize)]
struct BasePresenter<'a, T> {
page_name: &'a str,
@ -82,27 +106,18 @@ impl Service {
self.config.http.https_addr.is_some()
}
fn serve(&self, status_code: u16, path: &str, body: Body) -> Response<Body> {
match Response::builder()
fn serve<B>(&self, status_code: u16, path: &str, body: B) -> Response
where
B: Body<Data = bytes::Bytes, Error = io::Error> + Send + 'static,
{
hyper::Response::builder()
.status(status_code)
.header("Content-Type", service::guess_mime(path))
.body(body)
{
Ok(res) => res,
Err(err) => {
// if the status code was already a 500, don't try to render _another_ 500, it'll
// probably fail for the same reason. At this point something is incredibly wrong,
// just panic.
if status_code == 500 {
panic!("failed to build {}: {}", path, err);
}
self.internal_error(format!("failed to build {}: {}", path, err).as_str())
}
}
.body(BoxBody::new(body))
.unwrap_or_else(|err| panic!("failed to build {}: {}", path, err))
}
fn render<T>(&self, status_code: u16, name: &str, value: T) -> Response<Body>
fn render<T>(&self, status_code: u16, name: &str, value: T) -> Response
where
T: Serialize,
{
@ -113,11 +128,11 @@ impl Service {
..
}) => return self.render_error_page(404, "Static asset not found"),
Err(err) => {
return self.render_error_page(500, format!("template error: {err}").as_str())
return self.render_error_page(500, format!("template error: {err}").as_str());
}
};
self.serve(status_code, name, rendered.into())
self.serve(status_code, name, bytes_body(rendered.into()))
}
fn presenter_http_scheme(&self) -> &str {
@ -127,7 +142,7 @@ impl Service {
"http"
}
fn render_error_page(&self, status_code: u16, e: &str) -> Response<Body> {
fn render_error_page(&self, status_code: u16, e: &str) -> Response {
#[derive(Serialize)]
struct Response<'a> {
error_msg: &'a str,
@ -145,7 +160,7 @@ impl Service {
)
}
fn internal_error(&self, e: &str) -> Response<Body> {
fn internal_error(&self, e: &str) -> Response {
log::error!("Internal error: {e}");
self.render_error_page(
500,
@ -153,20 +168,17 @@ impl Service {
)
}
fn render_redirect(&self, status_code: u16, target_uri: &str) -> Response<Body> {
Response::builder()
fn render_redirect(&self, status_code: u16, target_uri: &str) -> Response {
hyper::Response::builder()
.status(status_code)
.header("Location", target_uri.to_string())
.body(Body::empty())
.body(BoxBody::new(empty_body()))
.unwrap_or_else(|err| {
self.internal_error(
format!("failed to render {status_code} redirect to {target_uri}: {err}",)
.as_str(),
)
panic!("failed to render {status_code} redirect to {target_uri}: {err}",)
})
}
fn https_redirect(&self, domain: domain::Name, req: Request<Body>) -> Response<Body> {
fn https_redirect(&self, domain: domain::Name, req: Request) -> Response {
let https_addr = self.config.http.https_addr.unwrap();
(|| {
@ -198,7 +210,7 @@ impl Service {
})
}
fn render_page<T>(&self, name: &str, data: T) -> Response<Body>
fn render_page<T>(&self, name: &str, data: T) -> Response
where
T: Serialize,
{
@ -214,12 +226,12 @@ impl Service {
)
}
async fn serve_origin(&self, settings: domain::Settings, req: Request<Body>) -> Response<Body> {
async fn serve_origin(&self, settings: domain::Settings, req: Request) -> Response {
let path = service::append_index_to_path(req.uri().path(), "index.html");
use domain::manager::GetFileError;
match self.domain_manager.get_file(&settings, &path).await {
Ok(f) => self.serve(200, &path, Body::wrap_stream(f.into_stream())),
Ok(f) => self.serve(200, &path, stream_body(f.into_stream())),
Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"),
Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"),
Err(GetFileError::DescrNotSynced) => self.internal_error(
@ -247,22 +259,23 @@ impl Service {
async fn with_query_req<'a, F, In, Out>(
&self,
req: &'a Parts,
body: Body,
req: &'a hyper::http::request::Parts,
body: RequestBody,
f: F,
) -> Response<Body>
) -> Response
where
In: for<'d> Deserialize<'d>,
F: FnOnce(In) -> Out,
Out: future::Future<Output = Response<Body>>,
Out: future::Future<Output = Response>,
{
let res = match self.config.http.form_method {
ConfigFormMethod::GET => {
serde_urlencoded::from_str::<In>(req.uri.query().unwrap_or(""))
}
ConfigFormMethod::POST => {
let body = match hyper::body::to_bytes(body).await {
Ok(bytes) => bytes.to_vec(),
use http_body_util::BodyExt;
let body = match body.collect().await {
Ok(res) => res.to_bytes(),
Err(e) => {
return self.internal_error(format!("failed to read body: {e}").as_str())
}
@ -279,7 +292,7 @@ impl Service {
}
}
fn domain(&self, args: DomainArgs) -> Response<Body> {
fn domain(&self, args: DomainArgs) -> Response {
#[derive(Serialize)]
struct Data {
domain: domain::Name,
@ -311,7 +324,7 @@ impl Service {
)
}
fn domain_init(&self, args: DomainInitArgs) -> Response<Body> {
fn domain_init(&self, args: DomainInitArgs) -> Response {
#[derive(Serialize)]
struct Data<'a> {
domain: domain::Name,
@ -368,7 +381,7 @@ impl Service {
)
}
async fn domain_sync(&self, args: DomainSyncArgs) -> Response<Body> {
async fn domain_sync(&self, args: DomainSyncArgs) -> Response {
if args.passphrase != self.config.passphrase.as_str() {
return self.render_error_page(401, "Incorrect passphrase");
}
@ -451,7 +464,7 @@ impl Service {
)
}
fn domains(&self) -> Response<Body> {
fn domains(&self) -> Response {
#[derive(Serialize)]
struct Response {
domains: Vec<String>,
@ -473,7 +486,7 @@ impl Service {
self.render_page("/domains.html", Response { domains })
}
async fn serve_interface(&self, req: Request<Body>) -> Response<Body> {
async fn serve_interface(&self, req: Request) -> Response {
let (req, body) = req.into_parts();
let path = req.uri.path();
@ -511,7 +524,7 @@ impl Service {
}
}
fn domain_from_req(req: &Request<Body>) -> Option<domain::Name> {
fn domain_from_req(req: &Request) -> Option<domain::Name> {
let host_header = req
.headers()
.get("Host")
@ -527,12 +540,18 @@ impl Service {
async fn handle_request(
&self,
client_ip: net::IpAddr,
req: Request<Body>,
req: Request,
req_is_https: bool,
) -> Response<Body> {
) -> Response {
let domain = match Self::domain_from_req(&req) {
Some(domain) => domain,
None => return self.render_error_page(400, "Cannot serve page without domain"),
Some(domain) => {
log::debug!("[{client_ip}] Serving request to {domain}{}", req.uri());
domain
}
None => {
log::debug!("[{client_ip}] Domain not found on request");
return self.render_error_page(400, "Cannot serve page without domain");
}
};
let method = req.method();
@ -545,7 +564,7 @@ impl Service {
let token = path.trim_start_matches("/.well-known/acme-challenge/");
if let Ok(key) = self.domain_manager.get_acme_http01_challenge_key(token) {
return self.serve(200, "token.txt", key.into());
return self.serve(200, "token.txt", bytes_body(key.into()));
}
}
@ -555,7 +574,7 @@ impl Service {
.domain_manager
.get_domain_checker_challenge_token(&domain)
{
Ok(Some(token)) => return self.serve(200, "token.txt", token.into()),
Ok(Some(token)) => return self.serve(200, "token.txt", bytes_body(token.into())),
Ok(None) => return self.render_error_page(404, "Token not found"),
Err(e) => {
return self.internal_error(
@ -593,6 +612,12 @@ impl Service {
req_is_https,
)
.await
.map(|res| {
res.map(|body| {
use http_body_util::BodyExt;
body.map_err(io::Error::other).boxed_unsync()
})
})
.unwrap_or_else(|e| {
self.internal_error(
format!("serving {domain} via proxy {}: {e}", http_url.original_url)
@ -630,3 +655,32 @@ fn strip_port(host: &str) -> &str {
Some(i) => &host[..i],
}
}
struct HyperServiceImpl {
service: sync::Arc<Service>,
client_ip: net::IpAddr,
is_https: bool,
}
impl HyperServiceImpl {
pub fn new(service: sync::Arc<Service>, client_ip: net::IpAddr, is_https: bool) -> Self {
Self {
service,
client_ip,
is_https,
}
}
}
impl hyper::service::Service<Request> for HyperServiceImpl {
type Response = Response;
type Error = std::io::Error;
type Future = crate::util::BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn call(&self, req: Request) -> Self::Future {
let service = self.service.clone();
let client_ip = self.client_ip;
let is_https = self.is_https;
Box::pin(async move { Ok(service.handle_request(client_ip, req, is_https).await) })
}
}

@ -1,16 +1,18 @@
use crate::error::unexpected;
use std::net;
use hyper::body::Incoming;
use hyper_reverse_proxy::ReverseProxy;
use hyper_trust_dns::{TrustDnsHttpConnector, TrustDnsResolver};
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioExecutor;
fn proxy_client() -> &'static ReverseProxy<TrustDnsHttpConnector> {
fn proxy_client() -> &'static ReverseProxy<HttpConnector> {
use std::sync::OnceLock;
static PROXY_CLIENT: OnceLock<ReverseProxy<TrustDnsHttpConnector>> = OnceLock::new();
static PROXY_CLIENT: OnceLock<ReverseProxy<HttpConnector>> = OnceLock::new();
PROXY_CLIENT.get_or_init(|| {
ReverseProxy::new(
hyper::Client::builder()
.build::<_, hyper::Body>(TrustDnsResolver::default().into_http_connector()),
hyper_util::client::legacy::Builder::new(TokioExecutor::new())
.build::<_, Incoming>(HttpConnector::new()),
)
})
}
@ -19,9 +21,9 @@ pub async fn serve_http_request(
proxy_addr: &str,
headers: &http::header::HeaderMap,
client_ip: net::IpAddr,
mut req: hyper::Request<hyper::Body>,
mut req: hyper::Request<Incoming>,
req_is_https: bool,
) -> unexpected::Result<hyper::Response<hyper::Body>> {
) -> unexpected::Result<hyper::Response<Incoming>> {
for (name, value) in headers {
if value.is_empty() {
req.headers_mut().remove(name);

@ -1,68 +1,121 @@
use crate::error::unexpected::{self, Mappable};
use crate::error::unexpected::{self, Intoable, Mappable};
use crate::service;
use std::{convert, future, sync};
use std::{net, pin, sync};
use futures::StreamExt;
use hyper::server::conn::AddrStream;
use tokio_rustls::server::TlsStream;
use tokio_util::sync::CancellationToken;
pub async fn listen_http(
async fn serve_conn<Conn>(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
) -> Result<(), unexpected::Error> {
let addr = service.config.http.http_addr;
conn: Conn,
remote_addr: net::SocketAddr,
req_is_https: bool,
) where
Conn: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + 'static,
{
log::debug!("[{remote_addr}] Handling incoming connection (is_https:{req_is_https})");
let service = crate::service::http::HyperServiceImpl::new(
service.clone(),
remote_addr.ip(),
req_is_https,
);
use core::time::Duration;
use hyper_util::{
rt::tokio::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder,
};
let timer = TokioTimer::new();
let mut builder = Builder::new(TokioExecutor::new());
builder
.http1()
.timer(timer.clone())
.header_read_timeout(Duration::from_secs(5))
.http2()
.timer(timer)
.keep_alive_interval(Some(Duration::from_secs(10)))
.keep_alive_timeout(Duration::from_secs(5));
let mut conn = pin::pin!(builder.serve_connection(TokioIo::new(conn), service));
tokio::select! {
res = conn.as_mut() => {
if let Err(err) = res {
log::error!("[{remote_addr}] Failed to serve HTTP(S) request: {err}")
} else {
log::debug!("[{remote_addr}] Finished handling connection");
}
return
},
_ = canceller.cancelled() => (),
}
let make_service = hyper::service::make_service_fn(move |conn: &AddrStream| {
let service = service.clone();
let client_ip = conn.remote_addr().ip();
log::debug!("[{remote_addr}] received cancellation notice, gracefully shutting down...");
conn.as_mut().graceful_shutdown();
// Create a `Service` for responding to the request.
let hyper_service = hyper::service::service_fn(move |req| {
let service = service.clone();
async move {
Ok::<_, convert::Infallible>(service.handle_request(client_ip, req, false).await)
}
});
let timeout_res = tokio::time::timeout(Duration::from_secs(5), async {
if let Err(err) = conn.await {
log::error!("[{remote_addr}] Failed to serve HTTP(S) request after shutdown: {err}")
} else {
log::debug!("[{remote_addr}] Finished handling connection after shutdown");
}
})
.await;
// Return the service to hyper.
async move { Ok::<_, convert::Infallible>(hyper_service) }
});
if timeout_res.is_err() {
log::debug!("[{remote_addr}] did not gracefully shutdown, forcing shutdown")
}
}
log::info!("Listening on http://{}", &addr);
let server = hyper::Server::bind(&addr).serve(make_service);
pub async fn listen_http(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
) -> unexpected::Result<()> {
let mut wg = awaitgroup::WaitGroup::new();
let addr = service.config.http.http_addr;
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_unexpected_while(|| format!("creating TCP listener on {addr}"))?;
let graceful = server.with_graceful_shutdown(async {
canceller.cancelled().await;
});
log::info!("Listening on http://{}", &addr);
graceful.await.or_unexpected()
loop {
tokio::select! {
accept_res = listener.accept() => {
match accept_res {
Ok((stream, remote_addr)) => {
let worker = wg.worker();
let service = service.clone();
let canceller = canceller.clone();
tokio::task::spawn(async move {
serve_conn(service, canceller, stream, remote_addr, false).await;
worker.done()
});
},
Err(err) =>
return Err(err.into_unexpected_while(format!("accepting new HTTP requests on {addr}"))),
}
},
_ = canceller.cancelled() => {
log::info!("No longer accepting new requests on http://{addr}, waiting on remaining requests...");
wg.wait().await;
log::info!("All requests on http://{addr} have completed");
return Ok(())
}
}
}
}
pub async fn listen_https(
service: sync::Arc<service::http::Service>,
canceller: CancellationToken,
) -> Result<(), unexpected::Error> {
let mut wg = awaitgroup::WaitGroup::new();
let cert_resolver = service.cert_resolver.clone();
let addr = service.config.http.https_addr.unwrap();
let make_service = hyper::service::make_service_fn(move |conn: &TlsStream<AddrStream>| {
let service = service.clone();
let client_ip = conn.get_ref().0.remote_addr().ip();
// Create a `Service` for responding to the request.
let hyper_service = hyper::service::service_fn(move |req| {
let service = service.clone();
async move {
Ok::<_, convert::Infallible>(service.handle_request(client_ip, req, true).await)
}
});
// Return the service to hyper.
async move { Ok::<_, convert::Infallible>(hyper_service) }
});
let server_config: tokio_rustls::TlsAcceptor = sync::Arc::new(
rustls::server::ServerConfig::builder()
.with_safe_defaults()
@ -71,27 +124,46 @@ pub async fn listen_https(
)
.into();
let addr_incoming = hyper::server::conn::AddrIncoming::bind(&addr)
.expect("https listen socket creation failed");
let incoming = tls_listener::TlsListener::new(server_config, addr_incoming).filter(|conn| {
if let Err(err) = conn {
log::error!("Error accepting TLS connection: {:?}", err);
future::ready(false)
} else {
future::ready(true)
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_unexpected_while(|| format!("creating TCP listener on {addr}"))?;
log::info!("Listening on https://{}", &addr);
loop {
tokio::select! {
accept_res = listener.accept() => {
match accept_res {
Ok((raw_stream, remote_addr)) => {
let worker = wg.worker();
let server_config = server_config.clone();
let service = service.clone();
let canceller = canceller.clone();
tokio::task::spawn(async move {
let stream = match server_config.accept(raw_stream).await {
Ok(s) => s,
Err(err) => {
log::warn!("failed to accept TLS connection on {addr}: {err}");
worker.done();
return;
}
};
serve_conn(service, canceller, stream, remote_addr, true).await;
worker.done();
});
},
Err(err) =>
return Err(err.into_unexpected_while(format!("accepting new HTTPS requests on {addr}"))),
}
},
_ = canceller.cancelled() => {
log::info!("No longer accepting new requests on https://{addr}, waiting on remaining requests...");
wg.wait().await;
log::info!("All requests on https://{addr} have completed");
return Ok(())
}
}
});
let incoming = hyper::server::accept::from_stream(incoming);
log::info!("Listening on https://{}", addr);
let server = hyper::Server::builder(incoming).serve(make_service);
let graceful = server.with_graceful_shutdown(async {
canceller.cancelled().await;
});
graceful.await.or_unexpected()
}
}

Loading…
Cancel
Save