Upgrade to hyper 1.2.0

This commit is contained in:
Brian Picciano 2024-02-27 22:34:41 +01:00
parent 9f4b94724f
commit 907ea5b7f4
2 changed files with 33 additions and 21 deletions

View File

@ -23,13 +23,14 @@ name="internal"
harness = false harness = false
[dependencies] [dependencies]
hyper = { version = "0.14.18", features = ["client"] } hyper = { version = "1.2.0", features = ["client", "http1"] }
hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
tokio = { version = "1.17.0", features = ["io-util", "rt"] } tokio = { version = "1.17.0", features = ["io-util", "rt"] }
tracing = "0.1.34" tracing = "0.1.34"
[dev-dependencies] [dev-dependencies]
hyper = { version = "0.14.18", features = ["server"] } hyper = { version = "1.2.0", features = ["client", "http1"] }
futures = "0.3.21" futures = "0.3.21"
async-trait = "0.1.53" async-trait = "0.1.53"
async-tungstenite = { version = "0.17", features = ["tokio-runtime"] } async-tungstenite = { version = "0.17", features = ["tokio-runtime"] }
@ -49,4 +50,4 @@ criterion = "0.3.5"
[features] [features]
__bench=[] __bench=[]

View File

@ -7,7 +7,9 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use hyper::http::header::{InvalidHeaderValue, ToStrError}; use hyper::http::header::{InvalidHeaderValue, ToStrError};
use hyper::http::uri::InvalidUri; use hyper::http::uri::InvalidUri;
use hyper::upgrade::OnUpgrade; use hyper::upgrade::OnUpgrade;
use hyper::{Body, Client, Error, Request, Response, StatusCode}; use hyper::{body::Incoming, Error, Request, Response, StatusCode};
use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError};
use hyper_util::rt::tokio::TokioIo;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::net::IpAddr; use std::net::IpAddr;
use tokio::io::copy_bidirectional; use tokio::io::copy_bidirectional;
@ -37,11 +39,18 @@ lazy_static! {
#[derive(Debug)] #[derive(Debug)]
pub enum ProxyError { pub enum ProxyError {
InvalidUri(InvalidUri), InvalidUri(InvalidUri),
LegacyHyperError(LegacyError),
HyperError(Error), HyperError(Error),
ForwardHeaderError, ForwardHeaderError,
UpgradeError(String), UpgradeError(String),
} }
impl From<LegacyError> for ProxyError {
fn from(err: LegacyError) -> ProxyError {
ProxyError::LegacyHyperError(err)
}
}
impl From<Error> for ProxyError { impl From<Error> for ProxyError {
fn from(err: Error) -> ProxyError { fn from(err: Error) -> ProxyError {
ProxyError::HyperError(err) ProxyError::HyperError(err)
@ -75,7 +84,7 @@ fn remove_hop_headers(headers: &mut HeaderMap) {
} }
fn get_upgrade_type(headers: &HeaderMap) -> Option<String> { fn get_upgrade_type(headers: &HeaderMap) -> Option<String> {
#[allow(clippy::blocks_in_if_conditions)] #[allow(clippy::blocks_in_conditions)]
if headers if headers
.get(&*CONNECTION_HEADER) .get(&*CONNECTION_HEADER)
.map(|value| { .map(|value| {
@ -278,12 +287,12 @@ fn create_proxied_request<B>(
Ok(request) Ok(request)
} }
pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>( pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
client_ip: IpAddr, client_ip: IpAddr,
forward_uri: &str, forward_uri: &str,
mut request: Request<Body>, mut request: Request<Incoming>,
client: &'a Client<T>, client: &'a Client<T, Incoming>,
) -> Result<Response<Body>, ProxyError> { ) -> Result<Response<Incoming>, ProxyError> {
info!( info!(
"Received proxy call from {} to {}, client: {}", "Received proxy call from {} to {}, client: {}",
request.uri().to_string(), request.uri().to_string(),
@ -307,17 +316,19 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync +
if request_upgrade_type == response_upgrade_type { if request_upgrade_type == response_upgrade_type {
if let Some(request_upgraded) = request_upgraded { if let Some(request_upgraded) = request_upgraded {
let mut response_upgraded = response let mut response_upgraded = TokioIo::new(
.extensions_mut() response
.remove::<OnUpgrade>() .extensions_mut()
.expect("response does not have an upgrade extension") .remove::<OnUpgrade>()
.await?; .expect("response does not have an upgrade extension")
.await?,
);
debug!("Responding to a connection upgrade response"); debug!("Responding to a connection upgrade response");
tokio::spawn(async move { tokio::spawn(async move {
let mut request_upgraded = let mut request_upgraded =
request_upgraded.await.expect("failed to upgrade request"); TokioIo::new(request_upgraded.await.expect("failed to upgrade request"));
copy_bidirectional(&mut response_upgraded, &mut request_upgraded) copy_bidirectional(&mut response_upgraded, &mut request_upgraded)
.await .await
@ -344,12 +355,12 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync +
} }
} }
pub struct ReverseProxy<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> { pub struct ReverseProxy<T: Connect + Clone + Send + Sync + 'static> {
client: Client<T>, client: Client<T, Incoming>,
} }
impl<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> ReverseProxy<T> { impl<T: Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
pub fn new(client: Client<T>) -> Self { pub fn new(client: Client<T, Incoming>) -> Self {
Self { client } Self { client }
} }
@ -357,8 +368,8 @@ impl<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> Reverse
&self, &self,
client_ip: IpAddr, client_ip: IpAddr,
forward_uri: &str, forward_uri: &str,
request: Request<Body>, request: Request<Incoming>,
) -> Result<Response<Body>, ProxyError> { ) -> Result<Response<Incoming>, ProxyError> {
call::<T>(client_ip, forward_uri, request, &self.client).await call::<T>(client_ip, forward_uri, request, &self.client).await
} }
} }