diff --git a/Cargo.toml b/Cargo.toml index 5abff50..55e824d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,13 +23,14 @@ name="internal" harness = false [dependencies] -hyper = { version = "0.14.18", features = ["client"] } +hyper = { version = "1.2.0", features = ["client", "http1"] } +hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] } lazy_static = "1.4.0" tokio = { version = "1.17.0", features = ["io-util", "rt"] } tracing = "0.1.34" [dev-dependencies] -hyper = { version = "0.14.18", features = ["server"] } +hyper = { version = "1.2.0", features = ["client", "http1"] } futures = "0.3.21" async-trait = "0.1.53" async-tungstenite = { version = "0.17", features = ["tokio-runtime"] } @@ -49,4 +50,4 @@ criterion = "0.3.5" [features] -__bench=[] \ No newline at end of file +__bench=[] diff --git a/src/lib.rs b/src/lib.rs index 1a2d3b5..7614075 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,9 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue}; use hyper::http::header::{InvalidHeaderValue, ToStrError}; use hyper::http::uri::InvalidUri; use hyper::upgrade::OnUpgrade; -use hyper::{Body, Client, Error, Request, Response, StatusCode}; +use hyper::{body::Incoming, Error, Request, Response, StatusCode}; +use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError}; +use hyper_util::rt::tokio::TokioIo; use lazy_static::lazy_static; use std::net::IpAddr; use tokio::io::copy_bidirectional; @@ -37,11 +39,18 @@ lazy_static! { #[derive(Debug)] pub enum ProxyError { InvalidUri(InvalidUri), + LegacyHyperError(LegacyError), HyperError(Error), ForwardHeaderError, UpgradeError(String), } +impl From for ProxyError { + fn from(err: LegacyError) -> ProxyError { + ProxyError::LegacyHyperError(err) + } +} + impl From for ProxyError { fn from(err: Error) -> ProxyError { ProxyError::HyperError(err) @@ -75,7 +84,7 @@ fn remove_hop_headers(headers: &mut HeaderMap) { } fn get_upgrade_type(headers: &HeaderMap) -> Option { - #[allow(clippy::blocks_in_if_conditions)] + #[allow(clippy::blocks_in_conditions)] if headers .get(&*CONNECTION_HEADER) .map(|value| { @@ -278,12 +287,12 @@ fn create_proxied_request( Ok(request) } -pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>( +pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>( client_ip: IpAddr, forward_uri: &str, - mut request: Request, - client: &'a Client, -) -> Result, ProxyError> { + mut request: Request, + client: &'a Client, +) -> Result, ProxyError> { info!( "Received proxy call from {} to {}, client: {}", request.uri().to_string(), @@ -307,17 +316,19 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + if request_upgrade_type == response_upgrade_type { if let Some(request_upgraded) = request_upgraded { - let mut response_upgraded = response - .extensions_mut() - .remove::() - .expect("response does not have an upgrade extension") - .await?; + let mut response_upgraded = TokioIo::new( + response + .extensions_mut() + .remove::() + .expect("response does not have an upgrade extension") + .await?, + ); debug!("Responding to a connection upgrade response"); tokio::spawn(async move { let mut request_upgraded = - request_upgraded.await.expect("failed to upgrade request"); + TokioIo::new(request_upgraded.await.expect("failed to upgrade request")); copy_bidirectional(&mut response_upgraded, &mut request_upgraded) .await @@ -344,12 +355,12 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + } } -pub struct ReverseProxy { - client: Client, +pub struct ReverseProxy { + client: Client, } -impl ReverseProxy { - pub fn new(client: Client) -> Self { +impl ReverseProxy { + pub fn new(client: Client) -> Self { Self { client } } @@ -357,8 +368,8 @@ impl Reverse &self, client_ip: IpAddr, forward_uri: &str, - request: Request, - ) -> Result, ProxyError> { + request: Request, + ) -> Result, ProxyError> { call::(client_ip, forward_uri, request, &self.client).await } }