Compare commits

..

No commits in common. "1dc4618994a5e9bc5de2083b911b1b08da7c081f" and "e73a76600ce9e51e962de5266b03be596e6c1d50" have entirely different histories.

2 changed files with 36 additions and 47 deletions

View File

@ -23,14 +23,13 @@ name="internal"
harness = false
[dependencies]
hyper = { version = "1.2.0", features = ["client", "http1"] }
hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] }
hyper = { version = "0.14.18", features = ["client"] }
lazy_static = "1.4.0"
tokio = { version = "1.17.0", features = ["io-util", "rt"] }
tracing = "0.1.34"
[dev-dependencies]
hyper = { version = "1.2.0", features = ["client", "http1"] }
hyper = { version = "0.14.18", features = ["server"] }
futures = "0.3.21"
async-trait = "0.1.53"
async-tungstenite = { version = "0.17", features = ["tokio-runtime"] }

View File

@ -3,13 +3,11 @@
#[macro_use]
extern crate tracing;
use hyper::header::{HeaderMap, HeaderName, HeaderValue};
use hyper::header::{HeaderMap, HeaderName, HeaderValue, HOST};
use hyper::http::header::{InvalidHeaderValue, ToStrError};
use hyper::http::uri::InvalidUri;
use hyper::upgrade::OnUpgrade;
use hyper::{body::Incoming, Error, Request, Response, StatusCode};
use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError};
use hyper_util::rt::tokio::TokioIo;
use hyper::{Body, Client, Error, Request, Response, StatusCode};
use lazy_static::lazy_static;
use std::net::IpAddr;
use tokio::io::copy_bidirectional;
@ -39,18 +37,11 @@ lazy_static! {
#[derive(Debug)]
pub enum ProxyError {
InvalidUri(InvalidUri),
LegacyHyperError(LegacyError),
HyperError(Error),
ForwardHeaderError,
UpgradeError(String),
}
impl From<LegacyError> for ProxyError {
fn from(err: LegacyError) -> ProxyError {
ProxyError::LegacyHyperError(err)
}
}
impl From<Error> for ProxyError {
fn from(err: Error) -> ProxyError {
ProxyError::HyperError(err)
@ -84,7 +75,7 @@ fn remove_hop_headers(headers: &mut HeaderMap) {
}
fn get_upgrade_type(headers: &HeaderMap) -> Option<String> {
#[allow(clippy::blocks_in_conditions)]
#[allow(clippy::blocks_in_if_conditions)]
if headers
.get(&*CONNECTION_HEADER)
.map(|value| {
@ -124,7 +115,7 @@ fn remove_connection_headers(headers: &mut HeaderMap) {
}
fn create_proxied_response<B>(mut response: Response<B>) -> Response<B> {
debug!("Creating proxied response");
info!("Creating proxied response");
remove_hop_headers(response.headers_mut());
remove_connection_headers(response.headers_mut());
@ -137,7 +128,7 @@ fn forward_uri<B>(forward_url: &str, req: &Request<B>) -> String {
let split_url = forward_url.split('?').collect::<Vec<&str>>();
let mut base_url: &str = split_url.first().unwrap_or(&"");
let mut base_url: &str = split_url.get(0).unwrap_or(&"");
let forward_url_query: &str = split_url.get(1).unwrap_or(&"");
let path2 = req.uri().path();
@ -216,7 +207,7 @@ fn create_proxied_request<B>(
mut request: Request<B>,
upgrade_type: Option<&String>,
) -> Result<Request<B>, ProxyError> {
debug!("Creating proxied request");
info!("Creating proxied request");
let contains_te_trailers_value = request
.headers()
@ -234,9 +225,9 @@ fn create_proxied_request<B>(
debug!("Setting headers of proxied request");
//request
// .headers_mut()
// .insert(HOST, HeaderValue::from_str(uri.host().unwrap())?);
request
.headers_mut()
.insert(HOST, HeaderValue::from_str(uri.host().unwrap())?);
*request.uri_mut() = uri;
@ -265,12 +256,12 @@ fn create_proxied_request<B>(
// Add forwarding information in the headers
match request.headers_mut().entry(&*X_FORWARDED_FOR) {
hyper::header::Entry::Vacant(entry) => {
debug!("X-Forwarded-for header was vacant");
debug!("X-Fowraded-for header was vacant");
entry.insert(client_ip.to_string().parse()?);
}
hyper::header::Entry::Occupied(entry) => {
debug!("X-Forwarded-for header was occupied");
debug!("X-Fowraded-for header was occupied");
let client_ip_str = client_ip.to_string();
let mut addr =
String::with_capacity(entry.get().as_bytes().len() + 2 + client_ip_str.len());
@ -287,13 +278,13 @@ fn create_proxied_request<B>(
Ok(request)
}
pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
client_ip: IpAddr,
forward_uri: &str,
mut request: Request<Incoming>,
client: &'a Client<T, Incoming>,
) -> Result<Response<Incoming>, ProxyError> {
debug!(
mut request: Request<Body>,
client: &'a Client<T>,
) -> Result<Response<Body>, ProxyError> {
info!(
"Received proxy call from {} to {}, client: {}",
request.uri().to_string(),
forward_uri,
@ -316,22 +307,21 @@ pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
if request_upgrade_type == response_upgrade_type {
if let Some(request_upgraded) = request_upgraded {
let mut response_upgraded = TokioIo::new(
response
let mut response_upgraded = response
.extensions_mut()
.remove::<OnUpgrade>()
.ok_or(ProxyError::UpgradeError(
"Failed to upgrade response".to_string(),
))?
.await?,
);
.expect("response does not have an upgrade extension")
.await?;
debug!("Responding to a connection upgrade response");
let mut request_upgraded = TokioIo::new(request_upgraded.await?);
tokio::spawn(async move {
copy_bidirectional(&mut response_upgraded, &mut request_upgraded).await
let mut request_upgraded =
request_upgraded.await.expect("failed to upgrade request");
copy_bidirectional(&mut response_upgraded, &mut request_upgraded)
.await
.expect("coping between upgraded connections failed");
});
Ok(response)
@ -354,12 +344,12 @@ pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
}
}
pub struct ReverseProxy<T: Connect + Clone + Send + Sync + 'static> {
client: Client<T, Incoming>,
pub struct ReverseProxy<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> {
client: Client<T>,
}
impl<T: Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
pub fn new(client: Client<T, Incoming>) -> Self {
impl<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
pub fn new(client: Client<T>) -> Self {
Self { client }
}
@ -367,8 +357,8 @@ impl<T: Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
&self,
client_ip: IpAddr,
forward_uri: &str,
request: Request<Incoming>,
) -> Result<Response<Incoming>, ProxyError> {
request: Request<Body>,
) -> Result<Response<Body>, ProxyError> {
call::<T>(client_ip, forward_uri, request, &self.client).await
}
}