From 9f78454cf1736846c40c09b70a1c93e8a8fa9743 Mon Sep 17 00:00:00 2001 From: quininer Date: Tue, 20 Mar 2018 20:17:44 +0800 Subject: [PATCH] feat: try futures 0.2 --- Cargo.toml | 12 +++---- src/lib.rs | 97 ++++++++++++++++++++++++++++----------------------- tests/test.rs | 13 +++---- 3 files changed, 64 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 151320b..68d12c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,17 +15,15 @@ travis-ci = { repository = "quininer/tokio-rustls" } appveyor = { repository = "quininer/tokio-rustls" } [dependencies] -futures = "0.1.15" -tokio-io = "0.1.3" +futures = "0.2.0-alpha" +tokio = { version = "0.1", features = [ "unstable-futures" ] } rustls = "0.12" webpki = "0.18.0-alpha" -tokio-proto = { version = "0.1.1", optional = true } [dev-dependencies] -tokio-core = "0.1" -tokio = "0.1" +tokio = { version = "0.1", features = [ "unstable-futures" ] } clap = "2.26" webpki-roots = "0.14" -[target.'cfg(unix)'.dev-dependencies] -tokio-file-unix = "0.4" +[patch.crates-io] +tokio = { git = "https://github.com/tokio-rs/tokio" } diff --git a/src/lib.rs b/src/lib.rs index 8ec6017..33a1dc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,17 +1,14 @@ //! Asynchronous TLS/SSL streams for Tokio using [Rustls](https://github.com/ctz/rustls). - -#[cfg_attr(feature = "tokio-proto", macro_use)] extern crate futures; -#[macro_use] extern crate tokio_io; +extern crate futures; +extern crate tokio; extern crate rustls; extern crate webpki; -pub mod proto; - use std::io; use std::sync::Arc; use futures::{ Future, Poll, Async }; -use tokio_io::{ AsyncRead, AsyncWrite }; +use futures::task::Context; use rustls::{ Session, ClientSession, ServerSession, ClientConfig, ServerConfig @@ -22,14 +19,14 @@ use rustls::{ pub trait ClientConfigExt { fn connect_async(&self, domain: webpki::DNSNameRef, stream: S) -> ConnectAsync - where S: AsyncRead + AsyncWrite; + where S: io::Read + io::Write; } /// Extension trait for the `Arc` type in the `rustls` crate. pub trait ServerConfigExt { fn accept_async(&self, stream: S) -> AcceptAsync - where S: AsyncRead + AsyncWrite; + where S: io::Read + io::Write; } @@ -45,7 +42,7 @@ pub struct AcceptAsync(MidHandshake); impl ClientConfigExt for Arc { fn connect_async(&self, domain: webpki::DNSNameRef, stream: S) -> ConnectAsync - where S: AsyncRead + AsyncWrite + where S: io::Read + io::Write { connect_async_with_session(stream, ClientSession::new(self, domain)) } @@ -54,7 +51,7 @@ impl ClientConfigExt for Arc { #[inline] pub fn connect_async_with_session(stream: S, session: ClientSession) -> ConnectAsync - where S: AsyncRead + AsyncWrite + where S: io::Read + io::Write { ConnectAsync(MidHandshake { inner: Some(TlsStream::new(stream, session)) @@ -64,7 +61,7 @@ pub fn connect_async_with_session(stream: S, session: ClientSession) impl ServerConfigExt for Arc { fn accept_async(&self, stream: S) -> AcceptAsync - where S: AsyncRead + AsyncWrite + where S: io::Read + io::Write { accept_async_with_session(stream, ServerSession::new(self)) } @@ -73,28 +70,28 @@ impl ServerConfigExt for Arc { #[inline] pub fn accept_async_with_session(stream: S, session: ServerSession) -> AcceptAsync - where S: AsyncRead + AsyncWrite + where S: io::Read + io::Write { AcceptAsync(MidHandshake { inner: Some(TlsStream::new(stream, session)) }) } -impl Future for ConnectAsync { +impl Future for ConnectAsync { type Item = TlsStream; type Error = io::Error; - fn poll(&mut self) -> Poll { - self.0.poll() + fn poll(&mut self, ctx: &mut Context) -> Poll { + self.0.poll(ctx) } } -impl Future for AcceptAsync { +impl Future for AcceptAsync { type Item = TlsStream; type Error = io::Error; - fn poll(&mut self) -> Poll { - self.0.poll() + fn poll(&mut self, ctx: &mut Context) -> Poll { + self.0.poll(ctx) } } @@ -104,12 +101,12 @@ struct MidHandshake { } impl Future for MidHandshake - where S: AsyncRead + AsyncWrite, C: Session + where S: io::Read + io::Write, C: Session { type Item = TlsStream; type Error = io::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self, _: &mut Context) -> Poll { loop { let stream = self.inner.as_mut().unwrap(); if !stream.session.is_handshaking() { break }; @@ -121,7 +118,7 @@ impl Future for MidHandshake (..) => break }, Err(e) => match (e.kind(), stream.session.is_handshaking()) { - (io::ErrorKind::WouldBlock, true) => return Ok(Async::NotReady), + (io::ErrorKind::WouldBlock, true) => return Ok(Async::Pending), (io::ErrorKind::WouldBlock, false) => break, (..) => return Err(e) } @@ -154,7 +151,7 @@ impl TlsStream { } impl TlsStream - where S: AsyncRead + AsyncWrite, C: Session + where S: io::Read + io::Write, C: Session { #[inline] pub fn new(io: S, session: C) -> TlsStream { @@ -214,7 +211,7 @@ impl TlsStream } impl io::Read for TlsStream - where S: AsyncRead + AsyncWrite, C: Session + where S: io::Read + io::Write, C: Session { fn read(&mut self, buf: &mut [u8]) -> io::Result { loop { @@ -233,7 +230,7 @@ impl io::Read for TlsStream } impl io::Write for TlsStream - where S: AsyncRead + AsyncWrite, C: Session + where S: io::Read + io::Write, C: Session { fn write(&mut self, buf: &[u8]) -> io::Result { if buf.is_empty() { @@ -272,26 +269,40 @@ impl io::Write for TlsStream } } -impl AsyncRead for TlsStream - where - S: AsyncRead + AsyncWrite, - C: Session -{} -impl AsyncWrite for TlsStream - where - S: AsyncRead + AsyncWrite, - C: Session -{ - fn shutdown(&mut self) -> Poll<(), io::Error> { - if !self.is_shutdown { - self.session.send_close_notify(); - self.is_shutdown = true; +mod tokio_impl { + use super::*; + use tokio::io::{ AsyncRead, AsyncWrite }; + use tokio::prelude::Poll; + + impl AsyncRead for TlsStream + where + S: AsyncRead + AsyncWrite, + C: Session + {} + + impl AsyncWrite for TlsStream + where + S: AsyncRead + AsyncWrite, + C: Session + { + fn shutdown(&mut self) -> Poll<(), io::Error> { + if !self.is_shutdown { + self.session.send_close_notify(); + self.is_shutdown = true; + } + while self.session.wants_write() { + self.session.write_tls(&mut self.io)?; + } + self.io.flush()?; + self.io.shutdown() } - while self.session.wants_write() { - try_nb!(self.session.write_tls(&mut self.io)); - } - try_nb!(self.io.flush()); - self.io.shutdown() } } + +mod futures_impl { + use super::*; + use futures::io::{ AsyncRead, AsyncWrite }; + + // TODO +} diff --git a/tests/test.rs b/tests/test.rs index baa22b2..5698ec0 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -1,7 +1,6 @@ extern crate rustls; extern crate futures; extern crate tokio; -extern crate tokio_io; extern crate tokio_rustls; extern crate webpki; @@ -10,10 +9,9 @@ use std::io::{ BufReader, Cursor }; use std::sync::Arc; use std::sync::mpsc::channel; use std::net::{ SocketAddr, IpAddr, Ipv4Addr }; -use futures::{ Future, Stream }; -use tokio::executor::current_thread; +use futures::{ FutureExt, StreamExt }; use tokio::net::{ TcpListener, TcpStream }; -use tokio_io::io as aio; +use tokio::io as aio; use rustls::{ Certificate, PrivateKey, ServerConfig, ClientConfig }; use rustls::internal::pemfile::{ certs, rsa_private_keys }; use tokio_rustls::{ ClientConfigExt, ServerConfigExt }; @@ -48,13 +46,12 @@ fn start_server(cert: Vec, rsa: PrivateKey) -> SocketAddr { .map(drop) .map_err(drop); - current_thread::spawn(done); + tokio::spawn2(done); Ok(()) }) - .map(drop) - .map_err(drop); + .then(|_| Ok(())); - current_thread::run(|_| current_thread::spawn(done)); + tokio::runtime::run2(done); }); recv.recv().unwrap()