diff --git a/Cargo.toml b/Cargo.toml index 09b3cf0..8729a58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hyper-reverse-proxy" -version = "0.2.0" +version = "0.2.1" authors = ["Brendan Zabarauskas "] license = "Apache-2.0" description = "A simple reverse proxy, to be used with Hyper and Tokio." @@ -24,4 +24,6 @@ lazy_static = "0.2" unicase = "2.0.0" [dev-dependencies] +error-chain = "0.10.0" tokio-core = "0.1.8" +tokio-signal = "0.1.2" diff --git a/README.md b/README.md index 6019635..c0ae95e 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,10 @@ [![Version](https://img.shields.io/crates/v/hyper-reverse-proxy.svg)](https://crates.io/crates/hyper-reverse-proxy) [![License](https://img.shields.io/crates/l/hyper-reverse-proxy.svg)](https://github.com/brendanzab/hyper-reverse-proxy/blob/master/LICENSE) -A simple reverse proxy, to be used with Hyper and Tokio. +A simple reverse proxy, to be used with [Hyper] and [Tokio]. The implementation was originally based on Go's [`httputil.ReverseProxy`]. +[Hyper]: http://hyper.rs/ +[Tokio]: http://tokio.rs/ [`httputil.ReverseProxy`]: https://golang.org/pkg/net/http/httputil/#ReverseProxy diff --git a/examples/basic.rs b/examples/basic.rs new file mode 100644 index 0000000..38a1953 --- /dev/null +++ b/examples/basic.rs @@ -0,0 +1,47 @@ +//! A simple use of the `ReverseProxy` service. + +extern crate futures; +extern crate hyper; +extern crate hyper_reverse_proxy; +extern crate tokio_core; + +use futures::Stream; +use hyper::Client; +use hyper::server::Http; +use hyper_reverse_proxy::ReverseProxy; +use tokio_core::net::TcpListener; +use tokio_core::reactor::Core; +use std::net::{SocketAddr, Ipv4Addr}; + +fn run() -> hyper::Result<()> { + // Set up the Tokio reactor core + let mut core = Core::new()?; + let handle = core.handle(); + + // Set up a TCP socket to listen to + let listen_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080); + let listener = TcpListener::bind(&listen_addr, &handle)?; + + // Listen to incoming requests over TCP, and forward them to a new `ReverseProxy` + let http = Http::new(); + let server = listener.incoming().for_each(|(socket, addr)| { + let client = Client::new(&handle); + let service = ReverseProxy::new(client, Some(addr.ip())); + http.bind_connection(&handle, socket, addr, service); + Ok(()) + }); + + // Start our server on the reactor core + core.run(server)?; + + Ok(()) +} + +fn main() { + use std::io::{self, Write}; + + if let Err(error) = run() { + write!(&mut io::stderr(), "{}", error).expect("Error writing to stderr"); + std::process::exit(1); + } +} diff --git a/examples/extended.rs b/examples/extended.rs new file mode 100644 index 0000000..827affd --- /dev/null +++ b/examples/extended.rs @@ -0,0 +1,151 @@ +//! A more involved example of using the `ReverseProxy` service. + +#[macro_use] +extern crate error_chain; +extern crate futures; +extern crate hyper; +extern crate hyper_reverse_proxy; +extern crate tokio_core; +extern crate tokio_signal; + +use futures::{BoxFuture, Future, Stream}; +use tokio_core::reactor::Handle; +use std::net::{SocketAddr, Ipv4Addr}; + +error_chain! { + foreign_links { + Io(std::io::Error); + Hyper(hyper::Error); + } +} + +fn shutdown_future(handle: &Handle) -> BoxFuture<(), std::io::Error> { + use tokio_signal::unix::{Signal, SIGINT, SIGTERM}; + + let sigint = Signal::new(SIGINT, handle).flatten_stream(); + let sigterm = Signal::new(SIGTERM, handle).flatten_stream(); + + Stream::select(sigint, sigterm) + .into_future() + .map(|_| ()) + .map_err(|(e, _)| e) + .boxed() +} + +fn run() -> Result<()> { + use futures::task::{self, Task}; + use hyper::Client; + use hyper::server::{Http, Service}; + use hyper_reverse_proxy::ReverseProxy; + use std::rc::{Rc, Weak}; + use std::cell::RefCell; + use std::time::Duration; + use tokio_core::net::TcpListener; + use tokio_core::reactor::{Core, Timeout}; + + struct Info { + active: usize, + blocker: Option, + } + + struct NotifyService { + inner: S, + info: Weak>, + } + + impl Service for NotifyService { + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn call(&self, message: Self::Request) -> Self::Future { + self.inner.call(message) + } + } + + impl Drop for NotifyService { + fn drop(&mut self) { + if let Some(info) = self.info.upgrade() { + let mut info = info.borrow_mut(); + info.active -= 1; + if info.active == 0 { + if let Some(task) = info.blocker.take() { + task.notify(); + } + } + } + } + } + + struct WaitUntilZero { + info: Rc>, + } + + impl Future for WaitUntilZero { + type Item = (); + type Error = std::io::Error; + + fn poll(&mut self) -> futures::Poll<(), std::io::Error> { + use futures::Async; + + let mut info = self.info.borrow_mut(); + if info.active == 0 { + Ok(Async::Ready(())) + } else { + info.blocker = Some(task::current()); + Ok(Async::NotReady) + } + } + } + + // Set up the Tokio reactor core + let mut core = Core::new()?; + let handle = core.handle(); + + // Set up a TCP socket to listen to + let listen_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080); + let listener = TcpListener::bind(&listen_addr, &handle)?; + + println!("Listening on {}", listen_addr); + + // Keep track of how many active connections we are managing + let info = Rc::new(RefCell::new(Info { + active: 0, + blocker: None, + })); + + // Listen to incoming requests over TCP, and forward them to a new `ReverseProxy` + let http = Http::new(); + let server = listener.incoming().for_each(|(socket, addr)| { + let client = Client::new(&handle); + let service = NotifyService { + inner: ReverseProxy::new(client, Some(addr.ip())), + info: Rc::downgrade(&info), + }; + + info.borrow_mut().active += 1; + http.bind_connection(&handle, socket, addr, service); + Ok(()) + }); + + let shutdown = shutdown_future(&handle); + + // Start our server, blocking the main thread. + match core.run(Future::select(shutdown, server)) { + Ok(((), _next)) => {} + Err((error, _next)) => bail!(error), + } + + println!("Shutting down gracefully"); + + // Let the outstanding requests run for 2 seconds, then shut down the server + let timeout = Timeout::new(Duration::from_secs(2), &handle)?; + let wait = WaitUntilZero { info: info.clone() }; + match core.run(Future::select(wait, timeout)) { + Ok(((), _next)) => Ok(()), + Err((error, _next)) => Err(error.into()), + } +} + +quick_main!(run); diff --git a/src/lib.rs b/src/lib.rs index 6379379..4dc7b2c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -//! A simple reverse proxy, to be used with Hyper and Tokio. +//! A simple reverse proxy, to be used with [Hyper] and [Tokio]. //! //! The implementation ensures that [Hop-by-hop headers] are stripped correctly in both directions, //! and adds the client's IP address to a comma-space-separated list of forwarding addresses in the @@ -6,50 +6,77 @@ //! //! The implementation is based on Go's [`httputil.ReverseProxy`]. //! +//! [Hyper]: http://hyper.rs/ +//! [Tokio]: http://tokio.rs/ //! [Hop-by-hop headers]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html //! [`httputil.ReverseProxy`]: https://golang.org/pkg/net/http/httputil/#ReverseProxy //! +//! # Example +//! +//! Because the reverse proxy needs client, we also need a [`Handle`]. This +//! means that we can't take advantage of hyper's handy [`Http::bind`] method +//! to quickly set up the server. Instead we do things manually: +//! +//! [`Handle`]: https://docs.rs/tokio-core/*/tokio_core/reactor/struct.Handle.html +//! [`Http::bind`]: https://docs.rs/hyper/*/hyper/server/struct.Http.html#method.bind +//! //! ```rust,no_run //! extern crate futures; //! extern crate hyper; //! extern crate hyper_reverse_proxy; //! extern crate tokio_core; //! -//! fn run() -> hyper::Result<()> { -//! use futures::Stream; -//! use hyper::Client; -//! use hyper::server::Http; -//! use hyper_reverse_proxy::ReverseProxy; -//! use tokio_core::net::TcpListener; -//! use tokio_core::reactor::Core; -//! use std::net::{SocketAddr, Ipv4Addr}; +//! use futures::Stream; +//! use hyper::Client; +//! use hyper::server::Http; +//! use hyper_reverse_proxy::ReverseProxy; +//! use tokio_core::net::TcpListener; +//! use tokio_core::reactor::Core; +//! use std::net::{SocketAddr, Ipv4Addr}; //! +//! fn run() -> hyper::Result<()> { +//! // Set up the Tokio reactor core //! let mut core = Core::new()?; //! let handle = core.handle(); +//! +//! // Set up a TCP socket to listen to //! let listen_addr = SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 8080); //! let listener = TcpListener::bind(&listen_addr, &handle)?; //! +//! // Listen to incoming requests over TCP, and forward them to a new `ReverseProxy` //! let http = Http::new(); //! let server = listener.incoming().for_each(|(socket, addr)| { -//! let service = ReverseProxy::new(Client::new(&handle), Some(addr.ip())); +//! let client = Client::new(&handle); +//! let service = ReverseProxy::new(client, Some(addr.ip())); //! http.bind_connection(&handle, socket, addr, service); //! Ok(()) //! }); //! +//! // Start our server on the reactor core //! core.run(server)?; //! //! Ok(()) //! } //! //! fn main() { -//! use std::io::Write; +//! use std::io::{self, Write}; //! //! if let Err(error) = run() { -//! write!(&mut std::io::stderr(), "{}", error).expect("Error writing to stderr"); +//! write!(&mut io::stderr(), "{}", error).expect("Error writing to stderr"); //! std::process::exit(1); //! } //! } //! ``` +//! +//! Note that in a production system we might also want to: +//! +//! - listen for `SIGINT` and `SIGTERM` signals using [tokio_signal] +//! - shut down the server gracefully, waiting for outstanding requests to complete +//! +//! To see an example of this, look at [examples/extended.rs]. +//! +//! [examples/extended.rs]: https://github.com/brendanzab/hyper-reverse-proxy/blob/master/examples/extended.rs +//! [tokio_signal]: https://github.com/alexcrichton/tokio-signal extern crate futures; #[macro_use]