diff --git a/Cargo.toml b/Cargo.toml index c6630aa..8372830 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,23 +31,19 @@ tokio = { version = "1.17.0", features = ["io-util", "rt"] } tracing = "0.1.34" [dev-dependencies] -hyper = { version = "1.2.0", features = ["client", "http1"] } +hyper = { version = "1.2.0", features = ["client", "http1", "server"] } futures = "0.3.21" async-trait = "0.1.53" async-tungstenite = { version = "0.17", features = ["tokio-runtime"] } tokio-test = "0.4.2" test-context = "0.1.3" tokiotest-httpserver = "0.2.1" -hyper-trust-dns = { version = "0.4.2", features = [ - "rustls-http2", - "dnssec-ring", - "dns-over-https-rustls", - "rustls-webpki" -] } rand = "0.8.5" tungstenite = "0.17" url = "2.2" criterion = "0.3.5" +hyper-rustls = "0.27.1" +rustls = "0.23.6" [features] diff --git a/examples/simple.rs b/examples/simple.rs index 97ca731..33fe9ad 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,26 +1,51 @@ -use hyper::server::conn::AddrStream; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Request, Response, Server, StatusCode}; +use std::convert::Infallible; +use std::io; +use std::net::{IpAddr, SocketAddr}; +use std::time::Duration; + +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::{BodyExt, Empty, Full}; +use hyper::body::{Bytes, Incoming}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response, StatusCode}; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; +use tokio::net::TcpListener; + use hyper_reverse_proxy::ReverseProxy; -use hyper_trust_dns::{RustlsHttpsConnector, TrustDnsResolver}; -use std::net::IpAddr; -use std::{convert::Infallible, net::SocketAddr}; +use hyper_rustls::{ConfigBuilderExt, HttpsConnector}; +use hyper_util::client::legacy::connect::HttpConnector; + +type Connector = HttpsConnector; +type ResponseBody = UnsyncBoxBody; lazy_static::lazy_static! { - static ref PROXY_CLIENT: ReverseProxy = { - ReverseProxy::new( - hyper::Client::builder().build::<_, hyper::Body>(TrustDnsResolver::default().into_rustls_webpki_https_connector()), - ) + static ref PROXY_CLIENT: ReverseProxy = { + let connector: Connector = Connector::builder() + .with_tls_config( + rustls::ClientConfig::builder() + .with_native_roots() + .expect("with_native_roots") + .with_no_client_auth(), + ) + .https_or_http() + .enable_http1() + .build(); + ReverseProxy::new( + hyper_util::client::legacy::Builder::new(TokioExecutor::new()) + .pool_idle_timeout(Duration::from_secs(3)) + .pool_timer(TokioTimer::new()) + .build::<_, Incoming>(connector), + ) }; } -fn debug_request(req: &Request) -> Result, Infallible> { - let body_str = format!("{:?}", req); - Ok(Response::new(Body::from(body_str))) -} - -async fn handle(client_ip: IpAddr, req: Request) -> Result, Infallible> { - if req.uri().path().starts_with("/target/first") { +async fn handle( + client_ip: IpAddr, + req: Request, +) -> Result, Infallible> { + let host = req.headers().get("host").and_then(|v| v.to_str().ok()); + if host.is_some_and(|host| host.starts_with("service1.localhost")) { match PROXY_CLIENT .call(client_ip, "http://127.0.0.1:13901", req) .await @@ -28,10 +53,12 @@ async fn handle(client_ip: IpAddr, req: Request) -> Result, Ok(response) => Ok(response), Err(_error) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty()) + .body(UnsyncBoxBody::new( + Empty::::new().map_err(io::Error::other), + )) .unwrap()), } - } else if req.uri().path().starts_with("/target/second") { + } else if host.is_some_and(|host| host.starts_with("service2.localhost")) { match PROXY_CLIENT .call(client_ip, "http://127.0.0.1:13902", req) .await @@ -39,29 +66,55 @@ async fn handle(client_ip: IpAddr, req: Request) -> Result, Ok(response) => Ok(response), Err(_error) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty()) + .body(UnsyncBoxBody::new( + Empty::::new().map_err(io::Error::other), + )) .unwrap()), } } else { - debug_request(&req) + let body_str = format!("{:?}", req); + Ok(Response::new(UnsyncBoxBody::new( + Full::new(Bytes::from(body_str)).map_err(io::Error::other), + ))) } } #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let bind_addr = "127.0.0.1:8000"; let addr: SocketAddr = bind_addr.parse().expect("Could not parse ip:port."); - let make_svc = make_service_fn(|conn: &AddrStream| { - let remote_addr = conn.remote_addr().ip(); - async move { Ok::<_, Infallible>(service_fn(move |req| handle(remote_addr, req))) } - }); + // We create a TcpListener and bind it to the address + let listener = TcpListener::bind(addr).await?; - let server = Server::bind(&addr).serve(make_svc); + println!( + "Access service1 on http://service1.localhost:{}", + addr.port() + ); + println!( + "Access service2 on http://service2.localhost:{}", + addr.port() + ); - println!("Running server on {:?}", addr); + // We start a loop to continuously accept incoming connections + loop { + let (stream, remote_addr) = listener.accept().await?; + let client_ip = remote_addr.ip(); - if let Err(e) = server.await { - eprintln!("server error: {}", e); + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(stream); + + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + // Finally, we bind the incoming connection to our `hello` service + if let Err(err) = http1::Builder::new() + // `service_fn` converts our function in a `Service` + .serve_connection(io, service_fn(move |req| handle(client_ip, req))) + .await + { + eprintln!("Error serving connection: {:?}", err); + } + }); } }