diff --git a/Cargo.lock b/Cargo.lock index 7718967..06a8f40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,7 +9,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453e534d4f46dcdddd7aa8619e9a664e153f34383d14710db0b0d76c2964db89" dependencies = [ "base64 0.13.1", - "hyper", + "hyper 0.14.26", "openssl", "reqwest", "serde", @@ -138,6 +138,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitgroup" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a872ceb3db05a391fbe7cf8eba07a1239b2d946eee66f9e942be9bff06206302" + [[package]] name = "backtrace" version = "0.3.69" @@ -401,9 +407,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" [[package]] name = "digest" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -415,6 +421,7 @@ version = "0.1.0" dependencies = [ "acme2", "async-compression", + "awaitgroup", "bytes", "clap", "env_logger", @@ -424,10 +431,12 @@ dependencies = [ "gix-object", "handlebars", "hex", - "http", - "hyper", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.2.0", "hyper-reverse-proxy", - "hyper-trust-dns", + "hyper-util", "log", "mime_guess", "mockall", @@ -436,7 +445,7 @@ dependencies = [ "rand 0.8.5", "reqwest", "rust-embed", - "rustls 0.21.10", + "rustls", "serde", "serde_json", "serde_urlencoded", @@ -446,9 +455,8 @@ dependencies = [ "signal-hook", "tempdir", "thiserror", - "tls-listener", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "trust-dns-client", ] @@ -840,7 +848,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", "indexmap 1.9.3", "slab", "tokio", @@ -848,6 +856,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.0.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "handlebars" version = "4.3.7" @@ -913,6 +940,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -920,7 +958,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -952,9 +1013,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.18", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -967,28 +1028,36 @@ dependencies = [ ] [[package]] -name = "hyper-reverse-proxy" -version = "0.5.2-dev" -source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?branch=dont-set-host-header#9f4b94724f9b164d2e2d08607780d5e85f53368e" +name = "hyper" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" dependencies = [ - "hyper", - "lazy_static", + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", "tokio", - "tracing", + "want", ] [[package]] -name = "hyper-rustls" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" +name = "hyper-reverse-proxy" +version = "0.5.2-dev" +source = "git+https://code.betamike.com/micropelago/hyper-reverse-proxy.git?branch=master#1dc4618994a5e9bc5de2083b911b1b08da7c081f" dependencies = [ - "http", - "hyper", - "rustls 0.20.9", + "hyper 1.2.0", + "hyper-util", + "lazy_static", "tokio", - "tokio-rustls 0.23.4", - "webpki-roots 0.22.6", + "tracing", ] [[package]] @@ -998,11 +1067,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", - "rustls 0.21.10", + "http 0.2.9", + "hyper 0.14.26", + "rustls", "tokio", - "tokio-rustls 0.24.1", + "tokio-rustls", ] [[package]] @@ -1012,22 +1081,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.26", "native-tls", "tokio", "tokio-native-tls", ] [[package]] -name = "hyper-trust-dns" -version = "0.5.0" +name = "hyper-util" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0deaf08b5c5409c0c74011f696a82bdadae4c6d70b7a71edf8378b29bdd840bd" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ - "hyper", - "hyper-rustls 0.23.2", + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.2.0", + "pin-project-lite", + "socket2 0.5.5", "tokio", - "trust-dns-resolver", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1188,12 +1265,6 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" -[[package]] -name = "linked-hash-map" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" - [[package]] name = "linux-raw-sys" version = "0.3.6" @@ -1216,15 +1287,6 @@ version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" -[[package]] -name = "lru-cache" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "matches" version = "0.1.10" @@ -1781,11 +1843,11 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-rustls 0.24.1", + "h2 0.3.18", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.26", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -1795,7 +1857,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls", "rustls-pemfile", "serde", "serde_json", @@ -1803,7 +1865,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.1", + "tokio-rustls", "tokio-util", "tower-service", "url", @@ -1811,7 +1873,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.25.3", + "webpki-roots", "winreg", ] @@ -1898,17 +1960,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.10" @@ -2095,9 +2146,9 @@ checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" [[package]] name = "sha2" -version = "0.10.6" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -2134,9 +2185,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.10.0" +version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" +checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" [[package]] name = "socket2" @@ -2335,20 +2386,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" -[[package]] -name = "tls-listener" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81294c017957a1a69794f506723519255879e15a870507faf45dfed288b763dd" -dependencies = [ - "futures-util", - "hyper", - "pin-project-lite", - "thiserror", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "tokio" version = "1.34.0" @@ -2389,23 +2426,12 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.23.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" -dependencies = [ - "rustls 0.20.9", - "tokio", - "webpki", -] - [[package]] name = "tokio-rustls" version = "0.24.1" source = "git+https://code.betamike.com/micropelago/tokio-rustls.git?branch=start-handshake-into-inner#3d462a1d97836cdb0600f0bc69c5e3b3310f6d8c" dependencies = [ - "rustls 0.21.10", + "rustls", "tokio", ] @@ -2423,6 +2449,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -2436,6 +2484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2516,24 +2565,6 @@ dependencies = [ "url", ] -[[package]] -name = "trust-dns-resolver" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aff21aa4dcefb0a1afbfac26deb0adc93888c7d295fb63ab273ef276ba2b7cfe" -dependencies = [ - "cfg-if", - "futures-util", - "lazy_static", - "lru-cache", - "parking_lot", - "smallvec", - "thiserror", - "tokio", - "tracing", - "trust-dns-proto", -] - [[package]] name = "try-lock" version = "0.2.4" @@ -2744,25 +2775,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" -dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", -] - -[[package]] -name = "webpki-roots" -version = "0.22.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] - [[package]] name = "webpki-roots" version = "0.25.3" diff --git a/Cargo.toml b/Cargo.toml index 8e3a31c..89a013d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ clap = { version = "4.2.7", features = ["derive", "env"] } handlebars = { version = "4.3.7", features = [ "rust-embed" ]} rust-embed = "6.6.1" mime_guess = "2.0.4" -hyper = { version = "0.14.26", features = [ "server", "stream" ]} -http = "0.2.9" +hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2"] } +http = "1.0.0" serde_urlencoded = "0.7.1" tokio-util = { version = "0.7.8", features = [ "io" ]} acme2 = "0.5.1" @@ -32,7 +32,6 @@ openssl = "0.10.52" rustls = "0.21.1" pem = "2.0.1" serde_with = "3.0.0" -tls-listener = { version = "0.7.0", features = [ "rustls", "hyper-h1" ]} tokio-rustls = "0.24.0" log = "0.4.19" env_logger = "0.10.0" @@ -41,14 +40,20 @@ rand = "0.8.5" hyper-reverse-proxy = "0.5.2-dev" gemini = "0.0.5" bytes = "1.4.0" -hyper-trust-dns = "0.5.0" gix-hash = "0.14.1" reqwest = { version = "0.11.23", features = ["gzip", "deflate", "stream"] } gix-object = "0.41.0" async-compression = { version = "0.4.6", features = ["tokio", "deflate", "zlib"] } +hyper-util = { version = "0.1.3", features = ["server", "http1", "http2", "tokio"] } +http-body-util = "0.1.0" +http-body = "1.0.0" +awaitgroup = "0.7.0" [patch.crates-io] # The micropelago fork of tokio-rustls allows for gemini proxying tokio-rustls = { git = "https://code.betamike.com/micropelago/tokio-rustls.git", branch = "start-handshake-into-inner" } -hyper-reverse-proxy = { git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", branch = "dont-set-host-header" } + +# The micropelago fork of hyper-reverse-proxy supports hyper v1, and fixes some +# bugs from upstream (which appears to be unmaintained). +hyper-reverse-proxy = { git = "https://code.betamike.com/micropelago/hyper-reverse-proxy.git", branch = "master" } diff --git a/src/service/http.rs b/src/service/http.rs index 88f1701..f8260fc 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -6,15 +6,39 @@ mod util; pub use config::*; -use http::request::Parts; -use hyper::{Body, Method, Request, Response}; -use serde::{Deserialize, Serialize}; - -use std::{future, net, sync}; +use std::{future, io, net, sync}; use crate::error::unexpected::{self, Mappable}; use crate::{domain, service, task_stack}; +use http_body_util::combinators::UnsyncBoxBody as BoxBody; +use hyper::{ + body::{Body, Incoming as RequestBody}, + Method, +}; +use serde::{Deserialize, Serialize}; + +type Request = hyper::Request; +pub type Response = hyper::Response>; + +fn bytes_body(b: bytes::Bytes) -> impl Body { + use http_body_util::BodyExt; + http_body_util::Full::new(b).map_err(io::Error::other) +} + +fn empty_body() -> impl Body { + use http_body_util::BodyExt; + http_body_util::Empty::::new().map_err(io::Error::other) +} + +fn stream_body(s: S) -> impl Body +where + S: futures::stream::Stream>, +{ + use futures::stream::TryStreamExt; + http_body_util::StreamBody::new(s.map_ok(http_body::Frame::data)) +} + #[derive(Serialize)] struct BasePresenter<'a, T> { page_name: &'a str, @@ -82,27 +106,18 @@ impl Service { self.config.http.https_addr.is_some() } - fn serve(&self, status_code: u16, path: &str, body: Body) -> Response { - match Response::builder() + fn serve(&self, status_code: u16, path: &str, body: B) -> Response + where + B: Body + Send + 'static, + { + hyper::Response::builder() .status(status_code) .header("Content-Type", service::guess_mime(path)) - .body(body) - { - Ok(res) => res, - Err(err) => { - // if the status code was already a 500, don't try to render _another_ 500, it'll - // probably fail for the same reason. At this point something is incredibly wrong, - // just panic. - if status_code == 500 { - panic!("failed to build {}: {}", path, err); - } - - self.internal_error(format!("failed to build {}: {}", path, err).as_str()) - } - } + .body(BoxBody::new(body)) + .unwrap_or_else(|err| panic!("failed to build {}: {}", path, err)) } - fn render(&self, status_code: u16, name: &str, value: T) -> Response + fn render(&self, status_code: u16, name: &str, value: T) -> Response where T: Serialize, { @@ -113,11 +128,11 @@ impl Service { .. }) => return self.render_error_page(404, "Static asset not found"), Err(err) => { - return self.render_error_page(500, format!("template error: {err}").as_str()) + return self.render_error_page(500, format!("template error: {err}").as_str()); } }; - self.serve(status_code, name, rendered.into()) + self.serve(status_code, name, bytes_body(rendered.into())) } fn presenter_http_scheme(&self) -> &str { @@ -127,7 +142,7 @@ impl Service { "http" } - fn render_error_page(&self, status_code: u16, e: &str) -> Response { + fn render_error_page(&self, status_code: u16, e: &str) -> Response { #[derive(Serialize)] struct Response<'a> { error_msg: &'a str, @@ -145,7 +160,7 @@ impl Service { ) } - fn internal_error(&self, e: &str) -> Response { + fn internal_error(&self, e: &str) -> Response { log::error!("Internal error: {e}"); self.render_error_page( 500, @@ -153,20 +168,17 @@ impl Service { ) } - fn render_redirect(&self, status_code: u16, target_uri: &str) -> Response { - Response::builder() + fn render_redirect(&self, status_code: u16, target_uri: &str) -> Response { + hyper::Response::builder() .status(status_code) .header("Location", target_uri.to_string()) - .body(Body::empty()) + .body(BoxBody::new(empty_body())) .unwrap_or_else(|err| { - self.internal_error( - format!("failed to render {status_code} redirect to {target_uri}: {err}",) - .as_str(), - ) + panic!("failed to render {status_code} redirect to {target_uri}: {err}",) }) } - fn https_redirect(&self, domain: domain::Name, req: Request) -> Response { + fn https_redirect(&self, domain: domain::Name, req: Request) -> Response { let https_addr = self.config.http.https_addr.unwrap(); (|| { @@ -198,7 +210,7 @@ impl Service { }) } - fn render_page(&self, name: &str, data: T) -> Response + fn render_page(&self, name: &str, data: T) -> Response where T: Serialize, { @@ -214,12 +226,12 @@ impl Service { ) } - async fn serve_origin(&self, settings: domain::Settings, req: Request) -> Response { + async fn serve_origin(&self, settings: domain::Settings, req: Request) -> Response { let path = service::append_index_to_path(req.uri().path(), "index.html"); use domain::manager::GetFileError; match self.domain_manager.get_file(&settings, &path).await { - Ok(f) => self.serve(200, &path, Body::wrap_stream(f.into_stream())), + Ok(f) => self.serve(200, &path, stream_body(f.into_stream())), Err(GetFileError::FileNotFound) => self.render_error_page(404, "File not found"), Err(GetFileError::Unavailable) => self.render_error_page(502, "Content unavailable"), Err(GetFileError::DescrNotSynced) => self.internal_error( @@ -247,22 +259,23 @@ impl Service { async fn with_query_req<'a, F, In, Out>( &self, - req: &'a Parts, - body: Body, + req: &'a hyper::http::request::Parts, + body: RequestBody, f: F, - ) -> Response + ) -> Response where In: for<'d> Deserialize<'d>, F: FnOnce(In) -> Out, - Out: future::Future>, + Out: future::Future, { let res = match self.config.http.form_method { ConfigFormMethod::GET => { serde_urlencoded::from_str::(req.uri.query().unwrap_or("")) } ConfigFormMethod::POST => { - let body = match hyper::body::to_bytes(body).await { - Ok(bytes) => bytes.to_vec(), + use http_body_util::BodyExt; + let body = match body.collect().await { + Ok(res) => res.to_bytes(), Err(e) => { return self.internal_error(format!("failed to read body: {e}").as_str()) } @@ -279,7 +292,7 @@ impl Service { } } - fn domain(&self, args: DomainArgs) -> Response { + fn domain(&self, args: DomainArgs) -> Response { #[derive(Serialize)] struct Data { domain: domain::Name, @@ -311,7 +324,7 @@ impl Service { ) } - fn domain_init(&self, args: DomainInitArgs) -> Response { + fn domain_init(&self, args: DomainInitArgs) -> Response { #[derive(Serialize)] struct Data<'a> { domain: domain::Name, @@ -368,7 +381,7 @@ impl Service { ) } - async fn domain_sync(&self, args: DomainSyncArgs) -> Response { + async fn domain_sync(&self, args: DomainSyncArgs) -> Response { if args.passphrase != self.config.passphrase.as_str() { return self.render_error_page(401, "Incorrect passphrase"); } @@ -451,7 +464,7 @@ impl Service { ) } - fn domains(&self) -> Response { + fn domains(&self) -> Response { #[derive(Serialize)] struct Response { domains: Vec, @@ -473,7 +486,7 @@ impl Service { self.render_page("/domains.html", Response { domains }) } - async fn serve_interface(&self, req: Request) -> Response { + async fn serve_interface(&self, req: Request) -> Response { let (req, body) = req.into_parts(); let path = req.uri.path(); @@ -511,7 +524,7 @@ impl Service { } } - fn domain_from_req(req: &Request) -> Option { + fn domain_from_req(req: &Request) -> Option { let host_header = req .headers() .get("Host") @@ -527,12 +540,18 @@ impl Service { async fn handle_request( &self, client_ip: net::IpAddr, - req: Request, + req: Request, req_is_https: bool, - ) -> Response { + ) -> Response { let domain = match Self::domain_from_req(&req) { - Some(domain) => domain, - None => return self.render_error_page(400, "Cannot serve page without domain"), + Some(domain) => { + log::debug!("[{client_ip}] Serving request to {domain}{}", req.uri()); + domain + } + None => { + log::debug!("[{client_ip}] Domain not found on request"); + return self.render_error_page(400, "Cannot serve page without domain"); + } }; let method = req.method(); @@ -545,7 +564,7 @@ impl Service { let token = path.trim_start_matches("/.well-known/acme-challenge/"); if let Ok(key) = self.domain_manager.get_acme_http01_challenge_key(token) { - return self.serve(200, "token.txt", key.into()); + return self.serve(200, "token.txt", bytes_body(key.into())); } } @@ -555,7 +574,7 @@ impl Service { .domain_manager .get_domain_checker_challenge_token(&domain) { - Ok(Some(token)) => return self.serve(200, "token.txt", token.into()), + Ok(Some(token)) => return self.serve(200, "token.txt", bytes_body(token.into())), Ok(None) => return self.render_error_page(404, "Token not found"), Err(e) => { return self.internal_error( @@ -593,6 +612,12 @@ impl Service { req_is_https, ) .await + .map(|res| { + res.map(|body| { + use http_body_util::BodyExt; + body.map_err(io::Error::other).boxed_unsync() + }) + }) .unwrap_or_else(|e| { self.internal_error( format!("serving {domain} via proxy {}: {e}", http_url.original_url) @@ -630,3 +655,32 @@ fn strip_port(host: &str) -> &str { Some(i) => &host[..i], } } + +struct HyperServiceImpl { + service: sync::Arc, + client_ip: net::IpAddr, + is_https: bool, +} + +impl HyperServiceImpl { + pub fn new(service: sync::Arc, client_ip: net::IpAddr, is_https: bool) -> Self { + Self { + service, + client_ip, + is_https, + } + } +} + +impl hyper::service::Service for HyperServiceImpl { + type Response = Response; + type Error = std::io::Error; + type Future = crate::util::BoxFuture<'static, Result>; + + fn call(&self, req: Request) -> Self::Future { + let service = self.service.clone(); + let client_ip = self.client_ip; + let is_https = self.is_https; + Box::pin(async move { Ok(service.handle_request(client_ip, req, is_https).await) }) + } +} diff --git a/src/service/http/proxy.rs b/src/service/http/proxy.rs index 5357eea..96de1d8 100644 --- a/src/service/http/proxy.rs +++ b/src/service/http/proxy.rs @@ -1,16 +1,18 @@ use crate::error::unexpected; use std::net; +use hyper::body::Incoming; use hyper_reverse_proxy::ReverseProxy; -use hyper_trust_dns::{TrustDnsHttpConnector, TrustDnsResolver}; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; -fn proxy_client() -> &'static ReverseProxy { +fn proxy_client() -> &'static ReverseProxy { use std::sync::OnceLock; - static PROXY_CLIENT: OnceLock> = OnceLock::new(); + static PROXY_CLIENT: OnceLock> = OnceLock::new(); PROXY_CLIENT.get_or_init(|| { ReverseProxy::new( - hyper::Client::builder() - .build::<_, hyper::Body>(TrustDnsResolver::default().into_http_connector()), + hyper_util::client::legacy::Builder::new(TokioExecutor::new()) + .build::<_, Incoming>(HttpConnector::new()), ) }) } @@ -19,9 +21,9 @@ pub async fn serve_http_request( proxy_addr: &str, headers: &http::header::HeaderMap, client_ip: net::IpAddr, - mut req: hyper::Request, + mut req: hyper::Request, req_is_https: bool, -) -> unexpected::Result> { +) -> unexpected::Result> { for (name, value) in headers { if value.is_empty() { req.headers_mut().remove(name); diff --git a/src/service/http/tasks.rs b/src/service/http/tasks.rs index 87c5fd1..2636e15 100644 --- a/src/service/http/tasks.rs +++ b/src/service/http/tasks.rs @@ -1,68 +1,121 @@ -use crate::error::unexpected::{self, Mappable}; +use crate::error::unexpected::{self, Intoable, Mappable}; use crate::service; -use std::{convert, future, sync}; +use std::{net, pin, sync}; -use futures::StreamExt; -use hyper::server::conn::AddrStream; -use tokio_rustls::server::TlsStream; use tokio_util::sync::CancellationToken; -pub async fn listen_http( +async fn serve_conn( service: sync::Arc, canceller: CancellationToken, -) -> Result<(), unexpected::Error> { - let addr = service.config.http.http_addr; + conn: Conn, + remote_addr: net::SocketAddr, + req_is_https: bool, +) where + Conn: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + 'static, +{ + log::debug!("[{remote_addr}] Handling incoming connection (is_https:{req_is_https})"); + let service = crate::service::http::HyperServiceImpl::new( + service.clone(), + remote_addr.ip(), + req_is_https, + ); + + use core::time::Duration; + use hyper_util::{ + rt::tokio::{TokioExecutor, TokioIo, TokioTimer}, + server::conn::auto::Builder, + }; + + let timer = TokioTimer::new(); + + let mut builder = Builder::new(TokioExecutor::new()); + builder + .http1() + .timer(timer.clone()) + .header_read_timeout(Duration::from_secs(5)) + .http2() + .timer(timer) + .keep_alive_interval(Some(Duration::from_secs(10))) + .keep_alive_timeout(Duration::from_secs(5)); + + let mut conn = pin::pin!(builder.serve_connection(TokioIo::new(conn), service)); + + tokio::select! { + res = conn.as_mut() => { + if let Err(err) = res { + log::error!("[{remote_addr}] Failed to serve HTTP(S) request: {err}") + } else { + log::debug!("[{remote_addr}] Finished handling connection"); + } + return + }, + _ = canceller.cancelled() => (), + } - let make_service = hyper::service::make_service_fn(move |conn: &AddrStream| { - let service = service.clone(); - let client_ip = conn.remote_addr().ip(); + log::debug!("[{remote_addr}] received cancellation notice, gracefully shutting down..."); + conn.as_mut().graceful_shutdown(); - // Create a `Service` for responding to the request. - let hyper_service = hyper::service::service_fn(move |req| { - let service = service.clone(); - async move { - Ok::<_, convert::Infallible>(service.handle_request(client_ip, req, false).await) - } - }); + let timeout_res = tokio::time::timeout(Duration::from_secs(5), async { + if let Err(err) = conn.await { + log::error!("[{remote_addr}] Failed to serve HTTP(S) request after shutdown: {err}") + } else { + log::debug!("[{remote_addr}] Finished handling connection after shutdown"); + } + }) + .await; - // Return the service to hyper. - async move { Ok::<_, convert::Infallible>(hyper_service) } - }); + if timeout_res.is_err() { + log::debug!("[{remote_addr}] did not gracefully shutdown, forcing shutdown") + } +} - log::info!("Listening on http://{}", &addr); - let server = hyper::Server::bind(&addr).serve(make_service); +pub async fn listen_http( + service: sync::Arc, + canceller: CancellationToken, +) -> unexpected::Result<()> { + let mut wg = awaitgroup::WaitGroup::new(); + let addr = service.config.http.http_addr; + let listener = tokio::net::TcpListener::bind(addr) + .await + .map_unexpected_while(|| format!("creating TCP listener on {addr}"))?; - let graceful = server.with_graceful_shutdown(async { - canceller.cancelled().await; - }); + log::info!("Listening on http://{}", &addr); - graceful.await.or_unexpected() + loop { + tokio::select! { + accept_res = listener.accept() => { + match accept_res { + Ok((stream, remote_addr)) => { + let worker = wg.worker(); + let service = service.clone(); + let canceller = canceller.clone(); + tokio::task::spawn(async move { + serve_conn(service, canceller, stream, remote_addr, false).await; + worker.done() + }); + }, + Err(err) => + return Err(err.into_unexpected_while(format!("accepting new HTTP requests on {addr}"))), + } + }, + _ = canceller.cancelled() => { + log::info!("No longer accepting new requests on http://{addr}, waiting on remaining requests..."); + wg.wait().await; + log::info!("All requests on http://{addr} have completed"); + return Ok(()) + } + } + } } pub async fn listen_https( service: sync::Arc, canceller: CancellationToken, ) -> Result<(), unexpected::Error> { + let mut wg = awaitgroup::WaitGroup::new(); let cert_resolver = service.cert_resolver.clone(); let addr = service.config.http.https_addr.unwrap(); - - let make_service = hyper::service::make_service_fn(move |conn: &TlsStream| { - let service = service.clone(); - let client_ip = conn.get_ref().0.remote_addr().ip(); - - // Create a `Service` for responding to the request. - let hyper_service = hyper::service::service_fn(move |req| { - let service = service.clone(); - async move { - Ok::<_, convert::Infallible>(service.handle_request(client_ip, req, true).await) - } - }); - - // Return the service to hyper. - async move { Ok::<_, convert::Infallible>(hyper_service) } - }); - let server_config: tokio_rustls::TlsAcceptor = sync::Arc::new( rustls::server::ServerConfig::builder() .with_safe_defaults() @@ -71,27 +124,46 @@ pub async fn listen_https( ) .into(); - let addr_incoming = hyper::server::conn::AddrIncoming::bind(&addr) - .expect("https listen socket creation failed"); - - let incoming = tls_listener::TlsListener::new(server_config, addr_incoming).filter(|conn| { - if let Err(err) = conn { - log::error!("Error accepting TLS connection: {:?}", err); - future::ready(false) - } else { - future::ready(true) + let listener = tokio::net::TcpListener::bind(addr) + .await + .map_unexpected_while(|| format!("creating TCP listener on {addr}"))?; + + log::info!("Listening on https://{}", &addr); + + loop { + tokio::select! { + accept_res = listener.accept() => { + match accept_res { + Ok((raw_stream, remote_addr)) => { + let worker = wg.worker(); + let server_config = server_config.clone(); + let service = service.clone(); + let canceller = canceller.clone(); + + tokio::task::spawn(async move { + let stream = match server_config.accept(raw_stream).await { + Ok(s) => s, + Err(err) => { + log::warn!("failed to accept TLS connection on {addr}: {err}"); + worker.done(); + return; + } + }; + + serve_conn(service, canceller, stream, remote_addr, true).await; + worker.done(); + }); + }, + Err(err) => + return Err(err.into_unexpected_while(format!("accepting new HTTPS requests on {addr}"))), + } + }, + _ = canceller.cancelled() => { + log::info!("No longer accepting new requests on https://{addr}, waiting on remaining requests..."); + wg.wait().await; + log::info!("All requests on https://{addr} have completed"); + return Ok(()) + } } - }); - - let incoming = hyper::server::accept::from_stream(incoming); - - log::info!("Listening on https://{}", addr); - - let server = hyper::Server::builder(incoming).serve(make_service); - - let graceful = server.with_graceful_shutdown(async { - canceller.cancelled().await; - }); - - graceful.await.or_unexpected() + } }