Compare commits

..

No commits in common. "1a9e3430dd7a9d3579ebe183abed6596a414c5bf" and "1dc4618994a5e9bc5de2083b911b1b08da7c081f" have entirely different histories.

3 changed files with 62 additions and 104 deletions

View File

@ -23,7 +23,6 @@ name="internal"
harness = false harness = false
[dependencies] [dependencies]
http-body-util = "0.1.0"
hyper = { version = "1.2.0", features = ["client", "http1"] } hyper = { version = "1.2.0", features = ["client", "http1"] }
hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] } hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"

View File

@ -1,18 +1,3 @@
# This is a fork
This repo contains a fork of the[original hyper-reverse-proxy
codebase][upstream], adding to it a few improvements:
- Fix to a bug where the `Host` header was getting overwritten on the upstream
HTTP request.
- Upgraded hyper version to 1.x (and fixes related to that upgrade)
- Logging cleanup
Plus more as time goes on.
[upstream]: https://github.com/felipenoris/hyper-reverse-proxy
# hyper-reverse-proxy # hyper-reverse-proxy
@ -178,4 +163,4 @@ Recommendations:
It is highly recommended to use one of them. It is highly recommended to use one of them.
> Currently only includes dns queries as `esni` or `ech` is still in draft by the `ietf` > Currently only includes dns queries as `esni` or `ech` is still in draft by the `ietf`

View File

@ -3,15 +3,15 @@
#[macro_use] #[macro_use]
extern crate tracing; extern crate tracing;
use http_body_util::{BodyExt, Empty};
use hyper::header::{HeaderMap, HeaderName, HeaderValue}; use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use hyper::http::header::{InvalidHeaderValue, ToStrError}; use hyper::http::header::{InvalidHeaderValue, ToStrError};
use hyper::http::uri::InvalidUri; use hyper::http::uri::InvalidUri;
use hyper::upgrade::OnUpgrade;
use hyper::{body::Incoming, Error, Request, Response, StatusCode}; use hyper::{body::Incoming, Error, Request, Response, StatusCode};
use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError}; use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError};
use hyper_util::rt::tokio::TokioIo; use hyper_util::rt::tokio::TokioIo;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::net::{IpAddr, SocketAddr}; use std::net::IpAddr;
use tokio::io::copy_bidirectional; use tokio::io::copy_bidirectional;
lazy_static! { lazy_static! {
@ -43,7 +43,6 @@ pub enum ProxyError {
HyperError(Error), HyperError(Error),
ForwardHeaderError, ForwardHeaderError,
UpgradeError(String), UpgradeError(String),
UpstreamError(String),
} }
impl From<LegacyError> for ProxyError { impl From<LegacyError> for ProxyError {
@ -133,7 +132,7 @@ fn create_proxied_response<B>(mut response: Response<B>) -> Response<B> {
response response
} }
fn create_forward_uri<B>(forward_url: &str, req: &Request<B>) -> String { fn forward_uri<B>(forward_url: &str, req: &Request<B>) -> String {
debug!("Building forward uri"); debug!("Building forward uri");
let split_url = forward_url.split('?').collect::<Vec<&str>>(); let split_url = forward_url.split('?').collect::<Vec<&str>>();
@ -213,6 +212,7 @@ fn create_forward_uri<B>(forward_url: &str, req: &Request<B>) -> String {
fn create_proxied_request<B>( fn create_proxied_request<B>(
client_ip: IpAddr, client_ip: IpAddr,
forward_url: &str,
mut request: Request<B>, mut request: Request<B>,
upgrade_type: Option<&String>, upgrade_type: Option<&String>,
) -> Result<Request<B>, ProxyError> { ) -> Result<Request<B>, ProxyError> {
@ -230,8 +230,16 @@ fn create_proxied_request<B>(
}) })
.unwrap_or(false); .unwrap_or(false);
let uri: hyper::Uri = forward_uri(forward_url, &request).parse()?;
debug!("Setting headers of proxied request"); debug!("Setting headers of proxied request");
//request
// .headers_mut()
// .insert(HOST, HeaderValue::from_str(uri.host().unwrap())?);
*request.uri_mut() = uri;
remove_hop_headers(request.headers_mut()); remove_hop_headers(request.headers_mut());
remove_connection_headers(request.headers_mut()); remove_connection_headers(request.headers_mut());
@ -279,29 +287,12 @@ fn create_proxied_request<B>(
Ok(request) Ok(request)
} }
fn get_upstream_addr(forward_uri: &hyper::Uri) -> Result<SocketAddr, ProxyError> {
let host = forward_uri.host().ok_or(ProxyError::UpstreamError(
"forward_uri has no host".to_string(),
))?;
let port = forward_uri.port_u16().ok_or(ProxyError::UpstreamError(
"forward_uri has no port".to_string(),
))?;
Ok(SocketAddr::new(
host.parse().map_err(|_| {
ProxyError::UpstreamError("forward_uri host must be an IP address".to_string())
})?,
port,
))
}
type ResponseBody = http_body_util::combinators::UnsyncBoxBody<hyper::body::Bytes, std::io::Error>;
pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>( pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
client_ip: IpAddr, client_ip: IpAddr,
forward_uri: &str, forward_uri: &str,
mut request: Request<Incoming>, mut request: Request<Incoming>,
client: &'a Client<T, Incoming>, client: &'a Client<T, Incoming>,
) -> Result<Response<ResponseBody>, ProxyError> { ) -> Result<Response<Incoming>, ProxyError> {
debug!( debug!(
"Received proxy call from {} to {}, client: {}", "Received proxy call from {} to {}, client: {}",
request.uri().to_string(), request.uri().to_string(),
@ -310,74 +301,57 @@ pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
); );
let request_upgrade_type = get_upgrade_type(request.headers()); let request_upgrade_type = get_upgrade_type(request.headers());
let request_upgraded = request.extensions_mut().remove::<OnUpgrade>();
let request_uri: hyper::Uri = create_forward_uri(forward_uri, &request).parse()?; let proxied_request = create_proxied_request(
*request.uri_mut() = request_uri.clone(); client_ip,
forward_uri,
request,
request_upgrade_type.as_ref(),
)?;
let mut response = client.request(proxied_request).await?;
let request = create_proxied_request(client_ip, request, request_upgrade_type.as_ref())?; if response.status() == StatusCode::SWITCHING_PROTOCOLS {
let response_upgrade_type = get_upgrade_type(response.headers());
if request_upgrade_type.is_none() { if request_upgrade_type == response_upgrade_type {
let response = client.request(request).await?; if let Some(request_upgraded) = request_upgraded {
let mut response_upgraded = TokioIo::new(
response
.extensions_mut()
.remove::<OnUpgrade>()
.ok_or(ProxyError::UpgradeError(
"Failed to upgrade response".to_string(),
))?
.await?,
);
debug!("Responding to a connection upgrade response");
let mut request_upgraded = TokioIo::new(request_upgraded.await?);
tokio::spawn(async move {
copy_bidirectional(&mut response_upgraded, &mut request_upgraded).await
});
Ok(response)
} else {
Err(ProxyError::UpgradeError(
"request does not have an upgrade extension".to_string(),
))
}
} else {
Err(ProxyError::UpgradeError(format!(
"backend tried to switch to protocol {:?} when {:?} was requested",
response_upgrade_type, request_upgrade_type
)))
}
} else {
let proxied_response = create_proxied_response(response);
debug!("Responding to call with response"); debug!("Responding to call with response");
return Ok(create_proxied_response( Ok(proxied_response)
response.map(|body| body.map_err(std::io::Error::other).boxed_unsync()),
));
} }
let (request_parts, request_body) = request.into_parts();
let upstream_request =
Request::from_parts(request_parts.clone(), Empty::<hyper::body::Bytes>::new());
let mut downstream_request = Request::from_parts(request_parts, request_body);
let (mut upstream_conn, downstream_response) = {
let upstream_addr = get_upstream_addr(&request_uri)?;
let conn = TokioIo::new(
tokio::net::TcpStream::connect(upstream_addr)
.await
.map_err(|e| ProxyError::UpstreamError(e.to_string()))?,
);
let (mut sender, conn) = hyper::client::conn::http1::handshake(conn).await?;
tokio::task::spawn(async move {
if let Err(err) = conn.with_upgrades().await {
warn!("Upgrading connection failed: {:?}", err);
}
});
let response = sender.send_request(upstream_request).await?;
if response.status() != StatusCode::SWITCHING_PROTOCOLS {
return Err(ProxyError::UpgradeError(
"Server did not response with Switching Protocols status".to_string(),
));
};
let (response_parts, response_body) = response.into_parts();
let upstream_response = Response::from_parts(response_parts.clone(), response_body);
let downstream_response = Response::from_parts(response_parts, Empty::new());
(
TokioIo::new(hyper::upgrade::on(upstream_response).await?),
downstream_response,
)
};
tokio::task::spawn(async move {
let mut downstream_conn = match hyper::upgrade::on(&mut downstream_request).await {
Ok(upgraded) => TokioIo::new(upgraded),
Err(e) => {
warn!("Failed to upgrade request: {e}");
return;
}
};
if let Err(e) = copy_bidirectional(&mut downstream_conn, &mut upstream_conn).await {
warn!("Bidirectional copy failed: {e}");
}
});
Ok(downstream_response.map(|body| body.map_err(std::io::Error::other).boxed_unsync()))
} }
pub struct ReverseProxy<T: Connect + Clone + Send + Sync + 'static> { pub struct ReverseProxy<T: Connect + Clone + Send + Sync + 'static> {
@ -394,7 +368,7 @@ impl<T: Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
client_ip: IpAddr, client_ip: IpAddr,
forward_uri: &str, forward_uri: &str,
request: Request<Incoming>, request: Request<Incoming>,
) -> Result<Response<ResponseBody>, ProxyError> { ) -> Result<Response<Incoming>, ProxyError> {
call::<T>(client_ip, forward_uri, request, &self.client).await call::<T>(client_ip, forward_uri, request, &self.client).await
} }
} }
@ -409,8 +383,8 @@ pub mod benches {
super::create_proxied_response(response); super::create_proxied_response(response);
} }
pub fn create_forward_uri<B>(forward_url: &str, req: &crate::Request<B>) { pub fn forward_uri<B>(forward_url: &str, req: &crate::Request<B>) {
super::create_forward_uri(forward_url, req); super::forward_uri(forward_url, req);
} }
pub fn create_proxied_request<B>( pub fn create_proxied_request<B>(