From ced35f66882cad876636e2e960aee0b70715cabd Mon Sep 17 00:00:00 2001 From: quininer kel Date: Wed, 22 Feb 2017 13:30:01 +0800 Subject: [PATCH] [Added] fork proto.rs --- examples/client.rs | 4 +- examples/server.rs | 4 +- src/lib.rs | 5 +- src/proto.rs | 551 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 560 insertions(+), 4 deletions(-) create mode 100644 src/proto.rs diff --git a/examples/client.rs b/examples/client.rs index b78d57d..6fb5f5b 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,4 +1,5 @@ extern crate clap; +extern crate rustls; extern crate futures; extern crate tokio_core; extern crate webpki_roots; @@ -14,8 +15,9 @@ use tokio_core::io; use tokio_core::net::TcpStream; use tokio_core::reactor::Core; use clap::{ App, Arg }; +use rustls::ClientConfig; use tokio_file_unix::{ StdFile, File }; -use tokio_rustls::{ ClientConfig, ClientConfigExt }; +use tokio_rustls::ClientConfigExt; fn app() -> App<'static, 'static> { diff --git a/examples/server.rs b/examples/server.rs index c5cf2fa..ca67f0e 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -10,13 +10,13 @@ use std::net::ToSocketAddrs; use std::io::BufReader; use std::fs::File; use futures::{ Future, Stream }; -use rustls::{ Certificate, PrivateKey }; +use rustls::{ Certificate, PrivateKey, ServerConfig }; use rustls::internal::pemfile::{ certs, rsa_private_keys }; use tokio_core::io::{ self, Io }; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; use clap::{ App, Arg }; -use tokio_rustls::{ ServerConfig, ServerConfigExt }; +use tokio_rustls::ServerConfigExt; fn app() -> App<'static, 'static> { diff --git a/src/lib.rs b/src/lib.rs index a7274da..899efd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,16 +2,19 @@ //! //! [tokio-tls](https://github.com/tokio-rs/tokio-tls) fork, use [rustls](https://github.com/ctz/rustls). +#[cfg_attr(feature = "tokio-proto", macro_use)] extern crate futures; extern crate tokio_core; extern crate rustls; +pub mod proto; + use std::io; use std::sync::Arc; use futures::{ Future, Poll, Async }; use tokio_core::io::Io; use rustls::{ Session, ClientSession, ServerSession }; -pub use rustls::{ ClientConfig, ServerConfig }; +use rustls::{ ClientConfig, ServerConfig }; /// Extension trait for the `Arc` type in the `rustls` crate. diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..24a8541 --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,551 @@ +//! Wrappers for `tokio-proto` +//! +//! This module contains wrappers for protocols defined by the `tokio-proto` +//! crate. These wrappers will all attempt to negotiate a TLS connection first +//! and then delegate all further protocol information to the protocol +//! specified. +//! +//! This module requires the `tokio-proto` feature to be enabled. + +#![cfg(feature = "tokio-proto")] + +extern crate tokio_proto; + +use std::io; +use std::sync::Arc; +use futures::{ Future, IntoFuture, Poll }; +use rustls::{ ServerConfig, ClientConfig, ServerSession, ClientSession }; +use self::tokio_proto::multiplex; +use self::tokio_proto::pipeline; +use self::tokio_proto::streaming; +use tokio_core::io::Io; + +use { TlsStream, ServerConfigExt, ClientConfigExt, AcceptAsync, ConnectAsync }; + +/// TLS server protocol wrapper. +/// +/// This structure is a wrapper for other implementations of `ServerProto` in +/// the `tokio-proto` crate. This structure will negotiate a TLS connection +/// first and then delegate all further operations to the `ServerProto` +/// implementation for the underlying type. +pub struct Server { + inner: Arc, + acceptor: Arc, +} + +impl Server { + /// Constructs a new TLS protocol which will delegate to the underlying + /// `protocol` specified. + /// + /// The `acceptor` provided will be used to accept TLS connections. All new + /// connections will go through the TLS acceptor first and then further I/O + /// will go through the negotiated TLS stream through the `protocol` + /// specified. + pub fn new(protocol: T, acceptor: ServerConfig) -> Server { + Server { + inner: Arc::new(protocol), + acceptor: Arc::new(acceptor), + } + } +} + +/// Future returned from `bind_transport` in the `ServerProto` implementation. +pub struct ServerPipelineBind + where T: pipeline::ServerProto>, + I: Io + 'static, +{ + state: PipelineState, +} + +enum PipelineState + where T: pipeline::ServerProto>, + I: Io + 'static, +{ + First(AcceptAsync, Arc), + Next(::Future), +} + +impl pipeline::ServerProto for Server + where T: pipeline::ServerProto>, + I: Io + 'static, +{ + type Request = T::Request; + type Response = T::Response; + type Transport = T::Transport; + type BindTransport = ServerPipelineBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + + ServerPipelineBind { + state: PipelineState::First(self.acceptor.accept_async(io), proto), + } + } +} + +impl Future for ServerPipelineBind + where T: pipeline::ServerProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + PipelineState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + PipelineState::Next(ref mut b) => return b.poll(), + }; + self.state = PipelineState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ServerProto` implementation. +pub struct ServerMultiplexBind + where T: multiplex::ServerProto>, + I: Io + 'static, +{ + state: MultiplexState, +} + +enum MultiplexState + where T: multiplex::ServerProto>, + I: Io + 'static, +{ + First(AcceptAsync, Arc), + Next(::Future), +} + +impl multiplex::ServerProto for Server + where T: multiplex::ServerProto>, + I: Io + 'static, +{ + type Request = T::Request; + type Response = T::Response; + type Transport = T::Transport; + type BindTransport = ServerMultiplexBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + + ServerMultiplexBind { + state: MultiplexState::First(self.acceptor.accept_async(io), proto), + } + } +} + +impl Future for ServerMultiplexBind + where T: multiplex::ServerProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + MultiplexState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + MultiplexState::Next(ref mut b) => return b.poll(), + }; + self.state = MultiplexState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ServerProto` implementation. +pub struct ServerStreamingPipelineBind + where T: streaming::pipeline::ServerProto>, + I: Io + 'static, +{ + state: StreamingPipelineState, +} + +enum StreamingPipelineState + where T: streaming::pipeline::ServerProto>, + I: Io + 'static, +{ + First(AcceptAsync, Arc), + Next(::Future), +} + +impl streaming::pipeline::ServerProto for Server + where T: streaming::pipeline::ServerProto>, + I: Io + 'static, +{ + type Request = T::Request; + type RequestBody = T::RequestBody; + type Response = T::Response; + type ResponseBody = T::ResponseBody; + type Error = T::Error; + type Transport = T::Transport; + type BindTransport = ServerStreamingPipelineBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + + ServerStreamingPipelineBind { + state: StreamingPipelineState::First(self.acceptor.accept_async(io), proto), + } + } +} + +impl Future for ServerStreamingPipelineBind + where T: streaming::pipeline::ServerProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + StreamingPipelineState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + StreamingPipelineState::Next(ref mut b) => return b.poll(), + }; + self.state = StreamingPipelineState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ServerProto` implementation. +pub struct ServerStreamingMultiplexBind + where T: streaming::multiplex::ServerProto>, + I: Io + 'static, +{ + state: StreamingMultiplexState, +} + +enum StreamingMultiplexState + where T: streaming::multiplex::ServerProto>, + I: Io + 'static, +{ + First(AcceptAsync, Arc), + Next(::Future), +} + +impl streaming::multiplex::ServerProto for Server + where T: streaming::multiplex::ServerProto>, + I: Io + 'static, +{ + type Request = T::Request; + type RequestBody = T::RequestBody; + type Response = T::Response; + type ResponseBody = T::ResponseBody; + type Error = T::Error; + type Transport = T::Transport; + type BindTransport = ServerStreamingMultiplexBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + + ServerStreamingMultiplexBind { + state: StreamingMultiplexState::First(self.acceptor.accept_async(io), proto), + } + } +} + +impl Future for ServerStreamingMultiplexBind + where T: streaming::multiplex::ServerProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + StreamingMultiplexState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + StreamingMultiplexState::Next(ref mut b) => return b.poll(), + }; + self.state = StreamingMultiplexState::Next(next.into_future()); + } + } +} + +/// TLS client protocol wrapper. +/// +/// This structure is a wrapper for other implementations of `ClientProto` in +/// the `tokio-proto` crate. This structure will negotiate a TLS connection +/// first and then delegate all further operations to the `ClientProto` +/// implementation for the underlying type. +pub struct Client { + inner: Arc, + connector: Arc, + hostname: String, +} + +impl Client { + /// Constructs a new TLS protocol which will delegate to the underlying + /// `protocol` specified. + /// + /// The `connector` provided will be used to configure the TLS connection. Further I/O + /// will go through the negotiated TLS stream through the `protocol` specified. + pub fn new(protocol: T, + connector: ClientConfig, + hostname: &str) -> Client { + Client { + inner: Arc::new(protocol), + connector: Arc::new(connector), + hostname: hostname.to_string(), + } + } +} + +/// Future returned from `bind_transport` in the `ClientProto` implementation. +pub struct ClientPipelineBind + where T: pipeline::ClientProto>, + I: Io + 'static, +{ + state: ClientPipelineState, +} + +enum ClientPipelineState + where T: pipeline::ClientProto>, + I: Io + 'static, +{ + First(ConnectAsync, Arc), + Next(::Future), +} + +impl pipeline::ClientProto for Client + where T: pipeline::ClientProto>, + I: Io + 'static, +{ + type Request = T::Request; + type Response = T::Response; + type Transport = T::Transport; + type BindTransport = ClientPipelineBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + let io = self.connector.connect_async(&self.hostname, io); + + ClientPipelineBind { + state: ClientPipelineState::First(io, proto), + } + } +} + +impl Future for ClientPipelineBind + where T: pipeline::ClientProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + ClientPipelineState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + ClientPipelineState::Next(ref mut b) => return b.poll(), + }; + self.state = ClientPipelineState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ClientProto` implementation. +pub struct ClientMultiplexBind + where T: multiplex::ClientProto>, + I: Io + 'static, +{ + state: ClientMultiplexState, +} + +enum ClientMultiplexState + where T: multiplex::ClientProto>, + I: Io + 'static, +{ + First(ConnectAsync, Arc), + Next(::Future), +} + +impl multiplex::ClientProto for Client + where T: multiplex::ClientProto>, + I: Io + 'static, +{ + type Request = T::Request; + type Response = T::Response; + type Transport = T::Transport; + type BindTransport = ClientMultiplexBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + let io = self.connector.connect_async(&self.hostname, io); + + ClientMultiplexBind { + state: ClientMultiplexState::First(io, proto), + } + } +} + +impl Future for ClientMultiplexBind + where T: multiplex::ClientProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + ClientMultiplexState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + ClientMultiplexState::Next(ref mut b) => return b.poll(), + }; + self.state = ClientMultiplexState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ClientProto` implementation. +pub struct ClientStreamingPipelineBind + where T: streaming::pipeline::ClientProto>, + I: Io + 'static, +{ + state: ClientStreamingPipelineState, +} + +enum ClientStreamingPipelineState + where T: streaming::pipeline::ClientProto>, + I: Io + 'static, +{ + First(ConnectAsync, Arc), + Next(::Future), +} + +impl streaming::pipeline::ClientProto for Client + where T: streaming::pipeline::ClientProto>, + I: Io + 'static, +{ + type Request = T::Request; + type RequestBody = T::RequestBody; + type Response = T::Response; + type ResponseBody = T::ResponseBody; + type Error = T::Error; + type Transport = T::Transport; + type BindTransport = ClientStreamingPipelineBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + let io = self.connector.connect_async(&self.hostname, io); + + ClientStreamingPipelineBind { + state: ClientStreamingPipelineState::First(io, proto), + } + } +} + +impl Future for ClientStreamingPipelineBind + where T: streaming::pipeline::ClientProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + ClientStreamingPipelineState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + ClientStreamingPipelineState::Next(ref mut b) => return b.poll(), + }; + self.state = ClientStreamingPipelineState::Next(next.into_future()); + } + } +} + +/// Future returned from `bind_transport` in the `ClientProto` implementation. +pub struct ClientStreamingMultiplexBind + where T: streaming::multiplex::ClientProto>, + I: Io + 'static, +{ + state: ClientStreamingMultiplexState, +} + +enum ClientStreamingMultiplexState + where T: streaming::multiplex::ClientProto>, + I: Io + 'static, +{ + First(ConnectAsync, Arc), + Next(::Future), +} + +impl streaming::multiplex::ClientProto for Client + where T: streaming::multiplex::ClientProto>, + I: Io + 'static, +{ + type Request = T::Request; + type RequestBody = T::RequestBody; + type Response = T::Response; + type ResponseBody = T::ResponseBody; + type Error = T::Error; + type Transport = T::Transport; + type BindTransport = ClientStreamingMultiplexBind; + + fn bind_transport(&self, io: I) -> Self::BindTransport { + let proto = self.inner.clone(); + let io = self.connector.connect_async(&self.hostname, io); + + ClientStreamingMultiplexBind { + state: ClientStreamingMultiplexState::First(io, proto), + } + } +} + +impl Future for ClientStreamingMultiplexBind + where T: streaming::multiplex::ClientProto>, + I: Io + 'static, +{ + type Item = T::Transport; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let next = match self.state { + ClientStreamingMultiplexState::First(ref mut a, ref state) => { + let res = a.poll().map_err(|e| { + io::Error::new(io::ErrorKind::Other, e) + }); + state.bind_transport(try_ready!(res)) + } + ClientStreamingMultiplexState::Next(ref mut b) => return b.poll(), + }; + self.state = ClientStreamingMultiplexState::Next(next.into_future()); + } + } +}