[Changed] update tokio-io

This commit is contained in:
quininer kel 2017-03-16 10:15:06 +08:00
parent 0db05aa9bf
commit c7041e2111
4 changed files with 59 additions and 39 deletions

View File

@ -14,6 +14,7 @@ for nonblocking I/O streams.
[dependencies] [dependencies]
futures = "0.1" futures = "0.1"
tokio-core = "0.1" tokio-core = "0.1"
tokio-io = "0.1"
rustls = "0.5" rustls = "0.5"
tokio-proto = { version = "0.1", optional = true } tokio-proto = { version = "0.1", optional = true }

View File

@ -1,6 +1,7 @@
extern crate clap; extern crate clap;
extern crate rustls; extern crate rustls;
extern crate futures; extern crate futures;
extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate webpki_roots; extern crate webpki_roots;
extern crate tokio_file_unix; extern crate tokio_file_unix;
@ -11,7 +12,7 @@ use std::net::ToSocketAddrs;
use std::io::{ BufReader, stdout, stdin }; use std::io::{ BufReader, stdout, stdin };
use std::fs; use std::fs;
use futures::Future; use futures::Future;
use tokio_core::io::{ self, Io }; use tokio_io::{ io, AsyncRead };
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use clap::{ App, Arg }; use clap::{ App, Arg };
@ -71,8 +72,9 @@ fn main() {
.and_then(|stream| io::write_all(stream, text.as_bytes())) .and_then(|stream| io::write_all(stream, text.as_bytes()))
.and_then(|(stream, _)| { .and_then(|(stream, _)| {
let (r, w) = stream.split(); let (r, w) = stream.split();
io::copy(r, stdout).select(io::copy(stdin, w)) io::copy(r, stdout)
.map(|_| ()) .map(|_| ())
.select(io::copy(stdin, w).map(|_| ()))
.map_err(|(e, _)| e) .map_err(|(e, _)| e)
}); });

View File

@ -1,6 +1,7 @@
extern crate clap; extern crate clap;
extern crate rustls; extern crate rustls;
extern crate futures; extern crate futures;
extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate webpki_roots; extern crate webpki_roots;
extern crate tokio_rustls; extern crate tokio_rustls;
@ -12,7 +13,7 @@ use std::fs::File;
use futures::{ Future, Stream }; use futures::{ Future, Stream };
use rustls::{ Certificate, PrivateKey, ServerConfig }; use rustls::{ Certificate, PrivateKey, ServerConfig };
use rustls::internal::pemfile::{ certs, rsa_private_keys }; use rustls::internal::pemfile::{ certs, rsa_private_keys };
use tokio_core::io::{ self, Io }; use tokio_io::{ io, AsyncRead };
use tokio_core::net::TcpListener; use tokio_core::net::TcpListener;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use clap::{ App, Arg }; use clap::{ App, Arg };
@ -62,7 +63,7 @@ fn main() {
let (reader, writer) = stream.split(); let (reader, writer) = stream.split();
io::copy(reader, writer) io::copy(reader, writer)
}) })
.map(move |n| println!("Echo: {} - {}", n, addr)) .map(move |(n, _, _)| println!("Echo: {} - {}", n, addr))
.map_err(move |err| println!("Error: {:?} - {}", err, addr)); .map_err(move |err| println!("Error: {:?} - {}", err, addr));
handle.spawn(done); handle.spawn(done);

View File

@ -2,8 +2,8 @@
//! //!
//! [tokio-tls](https://github.com/tokio-rs/tokio-tls) fork, use [rustls](https://github.com/ctz/rustls). //! [tokio-tls](https://github.com/tokio-rs/tokio-tls) fork, use [rustls](https://github.com/ctz/rustls).
#[cfg_attr(feature = "tokio-proto", macro_use)] #[cfg_attr(feature = "tokio-proto", macro_use)] extern crate futures;
extern crate futures; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate rustls; extern crate rustls;
@ -12,7 +12,7 @@ pub mod proto;
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use futures::{ Future, Poll, Async }; use futures::{ Future, Poll, Async };
use tokio_core::io::Io; use tokio_io::{ AsyncRead, AsyncWrite };
use rustls::{ Session, ClientSession, ServerSession }; use rustls::{ Session, ClientSession, ServerSession };
use rustls::{ ClientConfig, ServerConfig }; use rustls::{ ClientConfig, ServerConfig };
@ -21,14 +21,14 @@ use rustls::{ ClientConfig, ServerConfig };
pub trait ClientConfigExt { pub trait ClientConfigExt {
fn connect_async<S>(&self, domain: &str, stream: S) fn connect_async<S>(&self, domain: &str, stream: S)
-> ConnectAsync<S> -> ConnectAsync<S>
where S: Io; where S: AsyncRead + AsyncWrite;
} }
/// Extension trait for the `Arc<ServerConfig>` type in the `rustls` crate. /// Extension trait for the `Arc<ServerConfig>` type in the `rustls` crate.
pub trait ServerConfigExt { pub trait ServerConfigExt {
fn accept_async<S>(&self, stream: S) fn accept_async<S>(&self, stream: S)
-> AcceptAsync<S> -> AcceptAsync<S>
where S: Io; where S: AsyncRead + AsyncWrite;
} }
@ -44,7 +44,7 @@ pub struct AcceptAsync<S>(MidHandshake<S, ServerSession>);
impl ClientConfigExt for Arc<ClientConfig> { impl ClientConfigExt for Arc<ClientConfig> {
fn connect_async<S>(&self, domain: &str, stream: S) fn connect_async<S>(&self, domain: &str, stream: S)
-> ConnectAsync<S> -> ConnectAsync<S>
where S: Io where S: AsyncRead + AsyncWrite
{ {
ConnectAsync(MidHandshake { ConnectAsync(MidHandshake {
inner: Some(TlsStream::new(stream, ClientSession::new(self, domain))) inner: Some(TlsStream::new(stream, ClientSession::new(self, domain)))
@ -55,7 +55,7 @@ impl ClientConfigExt for Arc<ClientConfig> {
impl ServerConfigExt for Arc<ServerConfig> { impl ServerConfigExt for Arc<ServerConfig> {
fn accept_async<S>(&self, stream: S) fn accept_async<S>(&self, stream: S)
-> AcceptAsync<S> -> AcceptAsync<S>
where S: Io where S: AsyncRead + AsyncWrite
{ {
AcceptAsync(MidHandshake { AcceptAsync(MidHandshake {
inner: Some(TlsStream::new(stream, ServerSession::new(self))) inner: Some(TlsStream::new(stream, ServerSession::new(self)))
@ -63,7 +63,7 @@ impl ServerConfigExt for Arc<ServerConfig> {
} }
} }
impl<S: Io> Future for ConnectAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
type Item = TlsStream<S, ClientSession>; type Item = TlsStream<S, ClientSession>;
type Error = io::Error; type Error = io::Error;
@ -72,7 +72,7 @@ impl<S: Io> Future for ConnectAsync<S> {
} }
} }
impl<S: Io> Future for AcceptAsync<S> { impl<S: AsyncRead + AsyncWrite> Future for AcceptAsync<S> {
type Item = TlsStream<S, ServerSession>; type Item = TlsStream<S, ServerSession>;
type Error = io::Error; type Error = io::Error;
@ -87,7 +87,7 @@ struct MidHandshake<S, C> {
} }
impl<S, C> Future for MidHandshake<S, C> impl<S, C> Future for MidHandshake<S, C>
where S: Io, C: Session where S: AsyncRead + AsyncWrite, C: Session
{ {
type Item = TlsStream<S, C>; type Item = TlsStream<S, C>;
type Error = io::Error; type Error = io::Error;
@ -136,7 +136,7 @@ impl<S, C> TlsStream<S, C> {
} }
impl<S, C> TlsStream<S, C> impl<S, C> TlsStream<S, C>
where S: Io, C: Session where S: AsyncRead + AsyncWrite, C: Session
{ {
#[inline] #[inline]
pub fn new(io: S, session: C) -> TlsStream<S, C> { pub fn new(io: S, session: C) -> TlsStream<S, C> {
@ -149,29 +149,32 @@ impl<S, C> TlsStream<S, C>
pub fn do_io(&mut self) -> io::Result<()> { pub fn do_io(&mut self) -> io::Result<()> {
loop { loop {
let read_would_block = match (!self.eof && self.session.wants_read(), self.io.poll_read()) { let read_would_block = if !self.eof && self.session.wants_read() {
(true, Async::Ready(())) => { match self.session.read_tls(&mut self.io) {
match self.session.read_tls(&mut self.io) { Ok(0) => {
Ok(0) => self.eof = true, self.eof = true;
Ok(_) => self.session.process_new_packets() continue
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?, },
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), Ok(_) => {
Err(e) => return Err(e) self.session.process_new_packets()
}; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
continue continue
}, },
(true, Async::NotReady) => true, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
(false, _) => false, Err(e) => return Err(e)
}
} else {
false
}; };
let write_would_block = match (self.session.wants_write(), self.io.poll_write()) { let write_would_block = if self.session.wants_write() {
(true, Async::Ready(())) => match self.session.write_tls(&mut self.io) { match self.session.write_tls(&mut self.io) {
Ok(_) => continue, Ok(_) => continue,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => true,
Err(e) => return Err(e) Err(e) => return Err(e)
}, }
(true, Async::NotReady) => true, } else {
(false, _) => false false
}; };
if read_would_block || write_would_block { if read_would_block || write_would_block {
@ -184,7 +187,7 @@ impl<S, C> TlsStream<S, C>
} }
impl<S, C> io::Read for TlsStream<S, C> impl<S, C> io::Read for TlsStream<S, C>
where S: Io, C: Session where S: AsyncRead + AsyncWrite, C: Session
{ {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop { loop {
@ -203,12 +206,12 @@ impl<S, C> io::Read for TlsStream<S, C>
} }
impl<S, C> io::Write for TlsStream<S, C> impl<S, C> io::Write for TlsStream<S, C>
where S: Io, C: Session where S: AsyncRead + AsyncWrite, C: Session
{ {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = self.session.write(buf)?; let output = self.session.write(buf)?;
while self.session.wants_write() && self.io.poll_write().is_ready() { while self.session.wants_write() {
match self.session.write_tls(&mut self.io) { match self.session.write_tls(&mut self.io) {
Ok(_) => (), Ok(_) => (),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
@ -228,6 +231,19 @@ impl<S, C> io::Write for TlsStream<S, C>
} }
} }
impl<S, C> Io for TlsStream<S, C> where S: Io, C: Session { impl<S, C> AsyncRead for TlsStream<S, C>
// TODO impl poll_{read, write} where
S: AsyncRead + AsyncWrite,
C: Session
{}
impl<S, C> AsyncWrite for TlsStream<S, C>
where
S: AsyncRead + AsyncWrite,
C: Session
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.session.send_close_notify();
self.io.shutdown()
}
} }