Compare commits
5 Commits
e73a76600c
...
1dc4618994
Author | SHA1 | Date | |
---|---|---|---|
1dc4618994 | |||
5fe9e29ae4 | |||
29ea682d8f | |||
907ea5b7f4 | |||
9f4b94724f |
@ -23,13 +23,14 @@ name="internal"
|
|||||||
harness = false
|
harness = false
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
hyper = { version = "0.14.18", features = ["client"] }
|
hyper = { version = "1.2.0", features = ["client", "http1"] }
|
||||||
|
hyper-util = { version = "0.1.3", features = ["client-legacy", "http1","tokio"] }
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
tokio = { version = "1.17.0", features = ["io-util", "rt"] }
|
tokio = { version = "1.17.0", features = ["io-util", "rt"] }
|
||||||
tracing = "0.1.34"
|
tracing = "0.1.34"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
hyper = { version = "0.14.18", features = ["server"] }
|
hyper = { version = "1.2.0", features = ["client", "http1"] }
|
||||||
futures = "0.3.21"
|
futures = "0.3.21"
|
||||||
async-trait = "0.1.53"
|
async-trait = "0.1.53"
|
||||||
async-tungstenite = { version = "0.17", features = ["tokio-runtime"] }
|
async-tungstenite = { version = "0.17", features = ["tokio-runtime"] }
|
||||||
|
72
src/lib.rs
72
src/lib.rs
@ -3,11 +3,13 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
use hyper::header::{HeaderMap, HeaderName, HeaderValue, HOST};
|
use hyper::header::{HeaderMap, HeaderName, HeaderValue};
|
||||||
use hyper::http::header::{InvalidHeaderValue, ToStrError};
|
use hyper::http::header::{InvalidHeaderValue, ToStrError};
|
||||||
use hyper::http::uri::InvalidUri;
|
use hyper::http::uri::InvalidUri;
|
||||||
use hyper::upgrade::OnUpgrade;
|
use hyper::upgrade::OnUpgrade;
|
||||||
use hyper::{Body, Client, Error, Request, Response, StatusCode};
|
use hyper::{body::Incoming, Error, Request, Response, StatusCode};
|
||||||
|
use hyper_util::client::legacy::{connect::Connect, Client, Error as LegacyError};
|
||||||
|
use hyper_util::rt::tokio::TokioIo;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use std::net::IpAddr;
|
use std::net::IpAddr;
|
||||||
use tokio::io::copy_bidirectional;
|
use tokio::io::copy_bidirectional;
|
||||||
@ -37,11 +39,18 @@ lazy_static! {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum ProxyError {
|
pub enum ProxyError {
|
||||||
InvalidUri(InvalidUri),
|
InvalidUri(InvalidUri),
|
||||||
|
LegacyHyperError(LegacyError),
|
||||||
HyperError(Error),
|
HyperError(Error),
|
||||||
ForwardHeaderError,
|
ForwardHeaderError,
|
||||||
UpgradeError(String),
|
UpgradeError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<LegacyError> for ProxyError {
|
||||||
|
fn from(err: LegacyError) -> ProxyError {
|
||||||
|
ProxyError::LegacyHyperError(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<Error> for ProxyError {
|
impl From<Error> for ProxyError {
|
||||||
fn from(err: Error) -> ProxyError {
|
fn from(err: Error) -> ProxyError {
|
||||||
ProxyError::HyperError(err)
|
ProxyError::HyperError(err)
|
||||||
@ -75,7 +84,7 @@ fn remove_hop_headers(headers: &mut HeaderMap) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn get_upgrade_type(headers: &HeaderMap) -> Option<String> {
|
fn get_upgrade_type(headers: &HeaderMap) -> Option<String> {
|
||||||
#[allow(clippy::blocks_in_if_conditions)]
|
#[allow(clippy::blocks_in_conditions)]
|
||||||
if headers
|
if headers
|
||||||
.get(&*CONNECTION_HEADER)
|
.get(&*CONNECTION_HEADER)
|
||||||
.map(|value| {
|
.map(|value| {
|
||||||
@ -115,7 +124,7 @@ fn remove_connection_headers(headers: &mut HeaderMap) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn create_proxied_response<B>(mut response: Response<B>) -> Response<B> {
|
fn create_proxied_response<B>(mut response: Response<B>) -> Response<B> {
|
||||||
info!("Creating proxied response");
|
debug!("Creating proxied response");
|
||||||
|
|
||||||
remove_hop_headers(response.headers_mut());
|
remove_hop_headers(response.headers_mut());
|
||||||
remove_connection_headers(response.headers_mut());
|
remove_connection_headers(response.headers_mut());
|
||||||
@ -128,7 +137,7 @@ fn forward_uri<B>(forward_url: &str, req: &Request<B>) -> String {
|
|||||||
|
|
||||||
let split_url = forward_url.split('?').collect::<Vec<&str>>();
|
let split_url = forward_url.split('?').collect::<Vec<&str>>();
|
||||||
|
|
||||||
let mut base_url: &str = split_url.get(0).unwrap_or(&"");
|
let mut base_url: &str = split_url.first().unwrap_or(&"");
|
||||||
let forward_url_query: &str = split_url.get(1).unwrap_or(&"");
|
let forward_url_query: &str = split_url.get(1).unwrap_or(&"");
|
||||||
|
|
||||||
let path2 = req.uri().path();
|
let path2 = req.uri().path();
|
||||||
@ -207,7 +216,7 @@ fn create_proxied_request<B>(
|
|||||||
mut request: Request<B>,
|
mut request: Request<B>,
|
||||||
upgrade_type: Option<&String>,
|
upgrade_type: Option<&String>,
|
||||||
) -> Result<Request<B>, ProxyError> {
|
) -> Result<Request<B>, ProxyError> {
|
||||||
info!("Creating proxied request");
|
debug!("Creating proxied request");
|
||||||
|
|
||||||
let contains_te_trailers_value = request
|
let contains_te_trailers_value = request
|
||||||
.headers()
|
.headers()
|
||||||
@ -225,9 +234,9 @@ fn create_proxied_request<B>(
|
|||||||
|
|
||||||
debug!("Setting headers of proxied request");
|
debug!("Setting headers of proxied request");
|
||||||
|
|
||||||
request
|
//request
|
||||||
.headers_mut()
|
// .headers_mut()
|
||||||
.insert(HOST, HeaderValue::from_str(uri.host().unwrap())?);
|
// .insert(HOST, HeaderValue::from_str(uri.host().unwrap())?);
|
||||||
|
|
||||||
*request.uri_mut() = uri;
|
*request.uri_mut() = uri;
|
||||||
|
|
||||||
@ -256,12 +265,12 @@ fn create_proxied_request<B>(
|
|||||||
// Add forwarding information in the headers
|
// Add forwarding information in the headers
|
||||||
match request.headers_mut().entry(&*X_FORWARDED_FOR) {
|
match request.headers_mut().entry(&*X_FORWARDED_FOR) {
|
||||||
hyper::header::Entry::Vacant(entry) => {
|
hyper::header::Entry::Vacant(entry) => {
|
||||||
debug!("X-Fowraded-for header was vacant");
|
debug!("X-Forwarded-for header was vacant");
|
||||||
entry.insert(client_ip.to_string().parse()?);
|
entry.insert(client_ip.to_string().parse()?);
|
||||||
}
|
}
|
||||||
|
|
||||||
hyper::header::Entry::Occupied(entry) => {
|
hyper::header::Entry::Occupied(entry) => {
|
||||||
debug!("X-Fowraded-for header was occupied");
|
debug!("X-Forwarded-for header was occupied");
|
||||||
let client_ip_str = client_ip.to_string();
|
let client_ip_str = client_ip.to_string();
|
||||||
let mut addr =
|
let mut addr =
|
||||||
String::with_capacity(entry.get().as_bytes().len() + 2 + client_ip_str.len());
|
String::with_capacity(entry.get().as_bytes().len() + 2 + client_ip_str.len());
|
||||||
@ -278,13 +287,13 @@ fn create_proxied_request<B>(
|
|||||||
Ok(request)
|
Ok(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync + 'static>(
|
pub async fn call<'a, T: Connect + Clone + Send + Sync + 'static>(
|
||||||
client_ip: IpAddr,
|
client_ip: IpAddr,
|
||||||
forward_uri: &str,
|
forward_uri: &str,
|
||||||
mut request: Request<Body>,
|
mut request: Request<Incoming>,
|
||||||
client: &'a Client<T>,
|
client: &'a Client<T, Incoming>,
|
||||||
) -> Result<Response<Body>, ProxyError> {
|
) -> Result<Response<Incoming>, ProxyError> {
|
||||||
info!(
|
debug!(
|
||||||
"Received proxy call from {} to {}, client: {}",
|
"Received proxy call from {} to {}, client: {}",
|
||||||
request.uri().to_string(),
|
request.uri().to_string(),
|
||||||
forward_uri,
|
forward_uri,
|
||||||
@ -307,21 +316,22 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync +
|
|||||||
|
|
||||||
if request_upgrade_type == response_upgrade_type {
|
if request_upgrade_type == response_upgrade_type {
|
||||||
if let Some(request_upgraded) = request_upgraded {
|
if let Some(request_upgraded) = request_upgraded {
|
||||||
let mut response_upgraded = response
|
let mut response_upgraded = TokioIo::new(
|
||||||
|
response
|
||||||
.extensions_mut()
|
.extensions_mut()
|
||||||
.remove::<OnUpgrade>()
|
.remove::<OnUpgrade>()
|
||||||
.expect("response does not have an upgrade extension")
|
.ok_or(ProxyError::UpgradeError(
|
||||||
.await?;
|
"Failed to upgrade response".to_string(),
|
||||||
|
))?
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
|
||||||
debug!("Responding to a connection upgrade response");
|
debug!("Responding to a connection upgrade response");
|
||||||
|
|
||||||
tokio::spawn(async move {
|
let mut request_upgraded = TokioIo::new(request_upgraded.await?);
|
||||||
let mut request_upgraded =
|
|
||||||
request_upgraded.await.expect("failed to upgrade request");
|
|
||||||
|
|
||||||
copy_bidirectional(&mut response_upgraded, &mut request_upgraded)
|
tokio::spawn(async move {
|
||||||
.await
|
copy_bidirectional(&mut response_upgraded, &mut request_upgraded).await
|
||||||
.expect("coping between upgraded connections failed");
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
@ -344,12 +354,12 @@ pub async fn call<'a, T: hyper::client::connect::Connect + Clone + Send + Sync +
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ReverseProxy<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> {
|
pub struct ReverseProxy<T: Connect + Clone + Send + Sync + 'static> {
|
||||||
client: Client<T>,
|
client: Client<T, Incoming>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
|
impl<T: Connect + Clone + Send + Sync + 'static> ReverseProxy<T> {
|
||||||
pub fn new(client: Client<T>) -> Self {
|
pub fn new(client: Client<T, Incoming>) -> Self {
|
||||||
Self { client }
|
Self { client }
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,8 +367,8 @@ impl<T: hyper::client::connect::Connect + Clone + Send + Sync + 'static> Reverse
|
|||||||
&self,
|
&self,
|
||||||
client_ip: IpAddr,
|
client_ip: IpAddr,
|
||||||
forward_uri: &str,
|
forward_uri: &str,
|
||||||
request: Request<Body>,
|
request: Request<Incoming>,
|
||||||
) -> Result<Response<Body>, ProxyError> {
|
) -> Result<Response<Incoming>, ProxyError> {
|
||||||
call::<T>(client_ip, forward_uri, request, &self.client).await
|
call::<T>(client_ip, forward_uri, request, &self.client).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user