start migrate to futures 0.3 (again)

This commit is contained in:
quininer 2019-05-04 22:44:40 +08:00
parent 00f1022f88
commit 017b1b64d1
3 changed files with 114 additions and 78 deletions

View File

@ -9,14 +9,15 @@ documentation = "https://docs.rs/tokio-rustls"
readme = "README.md" readme = "README.md"
description = "Asynchronous TLS/SSL streams for Tokio using Rustls." description = "Asynchronous TLS/SSL streams for Tokio using Rustls."
categories = ["asynchronous", "cryptography", "network-programming"] categories = ["asynchronous", "cryptography", "network-programming"]
edition = "2018"
[badges] [badges]
travis-ci = { repository = "quininer/tokio-rustls" } travis-ci = { repository = "quininer/tokio-rustls" }
appveyor = { repository = "quininer/tokio-rustls" } appveyor = { repository = "quininer/tokio-rustls" }
[dependencies] [dependencies]
futures = "0.1" smallvec = "*"
tokio-io = "0.1.6" futures = { package = "futures-preview", version = "0.3.0-alpha.15" }
bytes = "0.4" bytes = "0.4"
iovec = "0.1" iovec = "0.1"
rustls = "0.15" rustls = "0.15"

View File

@ -1,18 +1,23 @@
mod vecbuf; // mod vecbuf;
use std::pin::Pin;
use std::task::Poll;
use std::marker::Unpin;
use std::io::{ self, Read, Write }; use std::io::{ self, Read, Write };
use rustls::Session; use rustls::Session;
use rustls::WriteV; use rustls::WriteV;
use tokio_io::{ AsyncRead, AsyncWrite }; use futures::task::Context;
use futures::io::{ AsyncRead, AsyncWrite, IoVec };
use smallvec::SmallVec;
pub struct Stream<'a, IO: 'a, S: 'a> { pub struct Stream<'a, IO, S> {
pub io: &'a mut IO, pub io: &'a mut IO,
pub session: &'a mut S pub session: &'a mut S,
} }
pub trait WriteTls<'a, IO: AsyncRead + AsyncWrite, S: Session>: Read + Write { pub trait WriteTls<IO: AsyncWrite, S: Session> {
fn write_tls(&mut self) -> io::Result<usize>; fn write_tls(&mut self, cx: &mut Context) -> io::Result<usize>;
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@ -22,36 +27,59 @@ enum Focus {
Writable Writable
} }
impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Stream<'a, IO, S> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
pub fn new(io: &'a mut IO, session: &'a mut S) -> Self { pub fn new(io: &'a mut IO, session: &'a mut S) -> Self {
Stream { io, session } Stream { io, session }
} }
pub fn complete_io(&mut self) -> io::Result<(usize, usize)> { pub fn complete_io(&mut self, cx: &mut Context) -> Poll<io::Result<(usize, usize)>> {
self.complete_inner_io(Focus::Empty) self.complete_inner_io(cx, Focus::Empty)
} }
fn complete_read_io(&mut self) -> io::Result<usize> { fn complete_read_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> {
let n = self.session.read_tls(self.io)?; struct Reader<'a, 'b, T> {
io: &'a mut T,
cx: &'a mut Context<'b>
}
impl<'a, 'b, T: AsyncRead + Unpin> Read for Reader<'a, 'b, T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match Pin::new(&mut self.io).poll_read(self.cx, buf) {
Poll::Ready(result) => result,
Poll::Pending => Err(io::ErrorKind::WouldBlock.into())
}
}
}
let mut reader = Reader { io: self.io, cx };
let n = match self.session.read_tls(&mut reader) {
Ok(n) => n,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Poll::Pending,
Err(err) => return Poll::Ready(Err(err))
};
self.session.process_new_packets() self.session.process_new_packets()
.map_err(|err| { .map_err(|err| {
// In case we have an alert to send describing this error, // In case we have an alert to send describing this error,
// try a last-gasp write -- but don't predate the primary // try a last-gasp write -- but don't predate the primary
// error. // error.
let _ = self.write_tls(); let _ = self.write_tls(cx);
io::Error::new(io::ErrorKind::InvalidData, err) io::Error::new(io::ErrorKind::InvalidData, err)
})?; })?;
Ok(n) Poll::Ready(Ok(n))
} }
fn complete_write_io(&mut self) -> io::Result<usize> { fn complete_write_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> {
self.write_tls() match self.write_tls(cx) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Poll::Pending,
result => Poll::Ready(result)
}
} }
fn complete_inner_io(&mut self, focus: Focus) -> io::Result<(usize, usize)> { fn complete_inner_io(&mut self, cx: &mut Context, focus: Focus) -> Poll<io::Result<(usize, usize)>> {
let mut wrlen = 0; let mut wrlen = 0;
let mut rdlen = 0; let mut rdlen = 0;
let mut eof = false; let mut eof = false;
@ -61,22 +89,22 @@ impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Stream<'a, IO, S> {
let mut read_would_block = false; let mut read_would_block = false;
while self.session.wants_write() { while self.session.wants_write() {
match self.complete_write_io() { match self.complete_write_io(cx) {
Ok(n) => wrlen += n, Poll::Ready(Ok(n)) => wrlen += n,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Poll::Pending => {
write_would_block = true; write_would_block = true;
break break
}, },
Err(err) => return Err(err) Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
} }
} }
if !eof && self.session.wants_read() { if !eof && self.session.wants_read() {
match self.complete_read_io() { match self.complete_read_io(cx) {
Ok(0) => eof = true, Poll::Ready(Ok(0)) => eof = true,
Ok(n) => rdlen += n, Poll::Ready(Ok(n)) => rdlen += n,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => read_would_block = true, Poll::Pending => read_would_block = true,
Err(err) => return Err(err) Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
} }
} }
@ -87,7 +115,7 @@ impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Stream<'a, IO, S> {
}; };
match (eof, self.session.is_handshaking(), would_block) { match (eof, self.session.is_handshaking(), would_block) {
(true, true, _) => return Err(io::ErrorKind::UnexpectedEof.into()), (true, true, _) => return Poll::Pending,
(_, false, true) => { (_, false, true) => {
let would_block = match focus { let would_block = match focus {
Focus::Empty => rdlen == 0 && wrlen == 0, Focus::Empty => rdlen == 0 && wrlen == 0,
@ -96,83 +124,96 @@ impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Stream<'a, IO, S> {
}; };
return if would_block { return if would_block {
Err(io::ErrorKind::WouldBlock.into()) Poll::Pending
} else { } else {
Ok((rdlen, wrlen)) Poll::Ready(Ok((rdlen, wrlen)))
}; };
}, },
(_, false, _) => return Ok((rdlen, wrlen)), (_, false, _) => return Poll::Ready(Ok((rdlen, wrlen))),
(_, true, true) => return Err(io::ErrorKind::WouldBlock.into()), (_, true, true) => return Poll::Pending,
(..) => () (..) => ()
} }
} }
} }
} }
impl<'a, IO: AsyncRead + AsyncWrite, S: Session> WriteTls<'a, IO, S> for Stream<'a, IO, S> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> WriteTls<IO, S> for Stream<'a, IO, S> {
fn write_tls(&mut self) -> io::Result<usize> { fn write_tls(&mut self, cx: &mut Context) -> io::Result<usize> {
use futures::Async; struct Writer<'a, 'b, IO> {
use self::vecbuf::VecBuf; io: &'a mut IO,
cx: &'a mut Context<'b>
}
struct V<'a, IO: 'a>(&'a mut IO); impl<'a, 'b, IO: AsyncWrite + Unpin> WriteV for Writer<'a, 'b, IO> {
impl<'a, IO: AsyncWrite> WriteV for V<'a, IO> {
fn writev(&mut self, vbytes: &[&[u8]]) -> io::Result<usize> { fn writev(&mut self, vbytes: &[&[u8]]) -> io::Result<usize> {
let mut vbytes = VecBuf::new(vbytes); let vbytes = vbytes
match self.0.write_buf(&mut vbytes) { .into_iter()
Ok(Async::Ready(n)) => Ok(n), .try_fold(SmallVec::<[&'_ IoVec; 16]>::new(), |mut sum, next| {
Ok(Async::NotReady) => Err(io::ErrorKind::WouldBlock.into()), sum.push(IoVec::from_bytes(next)?);
Err(err) => Err(err) Some(sum)
})
.unwrap_or_default();
match Pin::new(&mut self.io).poll_vectored_write(self.cx, &vbytes) {
Poll::Ready(result) => result,
Poll::Pending => Err(io::ErrorKind::WouldBlock.into())
} }
} }
} }
let mut vecio = V(self.io); let mut vecio = Writer { io: self.io, cx };
self.session.writev_tls(&mut vecio) self.session.writev_tls(&mut vecio)
} }
} }
impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Read for Stream<'a, IO, S> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn poll_read(&mut self, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
while self.session.wants_read() { while self.session.wants_read() {
if let (0, _) = self.complete_inner_io(Focus::Readable)? { match self.complete_inner_io(cx, Focus::Readable) {
break Poll::Ready(Ok((0, _))) => break,
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
} }
} }
self.session.read(buf)
}
}
impl<'a, IO: AsyncRead + AsyncWrite, S: Session> Write for Stream<'a, IO, S> { // FIXME rustls always ready ?
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { Poll::Ready(self.session.read(buf))
}
fn poll_write(&mut self, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let len = self.session.write(buf)?; let len = self.session.write(buf)?;
while self.session.wants_write() { while self.session.wants_write() {
match self.complete_inner_io(Focus::Writable) { match self.complete_inner_io(cx, Focus::Writable) {
Ok(_) => (), Poll::Ready(Ok(_)) => (),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock && len != 0 => break, Poll::Pending if len != 0 => break,
Err(err) => return Err(err) Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
} }
} }
if len != 0 || buf.is_empty() { if len != 0 || buf.is_empty() {
Ok(len) Poll::Ready(Ok(len))
} else { } else {
// not write zero // not write zero
self.session.write(buf) match self.session.write(buf) {
.and_then(|len| if len != 0 { Ok(0) => Poll::Pending,
Ok(len) Ok(n) => Poll::Ready(Ok(n)),
} else { Err(err) => Poll::Ready(Err(err))
Err(io::ErrorKind::WouldBlock.into()) }
})
} }
} }
fn flush(&mut self) -> io::Result<()> { fn poll_flush(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
self.session.flush()?; self.session.flush()?;
while self.session.wants_write() { while self.session.wants_write() {
self.complete_inner_io(Focus::Writable)?; match self.complete_inner_io(cx, Focus::Writable) {
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
} }
Ok(()) }
Poll::Ready(Ok(()))
} }
} }

View File

@ -1,17 +1,10 @@
//! Asynchronous TLS/SSL streams for Tokio using [Rustls](https://github.com/ctz/rustls). //! Asynchronous TLS/SSL streams for Tokio using [Rustls](https://github.com/ctz/rustls).
pub extern crate rustls; // pub mod client;
pub extern crate webpki;
extern crate bytes;
extern crate futures;
extern crate iovec;
extern crate tokio_io;
pub mod client;
mod common; mod common;
pub mod server; // pub mod server;
/*
use common::Stream; use common::Stream;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use rustls::{ClientConfig, ClientSession, ServerConfig, ServerSession}; use rustls::{ClientConfig, ClientSession, ServerConfig, ServerSession};
@ -194,3 +187,4 @@ impl<IO: AsyncRead + AsyncWrite> Future for Accept<IO> {
#[cfg(feature = "early-data")] #[cfg(feature = "early-data")]
#[cfg(test)] #[cfg(test)]
mod test_0rtt; mod test_0rtt;
*/