impl complete_io
This commit is contained in:
parent
41a6a3b501
commit
518ad51376
@ -12,16 +12,98 @@ pub struct Stream<'a, S: 'a, IO: 'a> {
|
|||||||
io: &'a mut IO
|
io: &'a mut IO
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
pub trait CompleteIo<'a, S: Session, IO: Read + Write>: Read + Write {
|
||||||
impl<'a, S: Session, IO: Write> Stream<'a, S, IO> {
|
fn write_tls(&mut self) -> io::Result<usize>;
|
||||||
pub default fn write_tls(&mut self) -> io::Result<usize> {
|
fn complete_io(&mut self) -> io::Result<(usize, usize)>;
|
||||||
self.session.write_tls(self.io)
|
}
|
||||||
|
|
||||||
|
impl<'a, S: Session, IO: Read + Write> Stream<'a, S, IO> {
|
||||||
|
pub fn new(session: &'a mut S, io: &'a mut IO) -> Self {
|
||||||
|
Stream { session, io }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
impl<'a, S: Session, IO: AsyncWrite> Stream<'a, S, IO> {
|
impl<'a, S: Session, IO: Read + Write> CompleteIo<'a, S, IO> for Stream<'a, S, IO> {
|
||||||
pub fn write_tls(&mut self) -> io::Result<usize> {
|
default fn write_tls(&mut self) -> io::Result<usize> {
|
||||||
|
self.session.write_tls(self.io)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn complete_io(&mut self) -> io::Result<(usize, usize)> {
|
||||||
|
// fork from https://github.com/ctz/rustls/blob/master/src/session.rs#L161
|
||||||
|
|
||||||
|
let until_handshaked = self.session.is_handshaking();
|
||||||
|
let mut eof = false;
|
||||||
|
let mut wrlen = 0;
|
||||||
|
let mut rdlen = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
while self.session.wants_write() {
|
||||||
|
wrlen += self.write_tls()?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !until_handshaked && wrlen > 0 {
|
||||||
|
return Ok((rdlen, wrlen));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !eof && self.session.wants_read() {
|
||||||
|
match self.session.read_tls(self.io)? {
|
||||||
|
0 => eof = true,
|
||||||
|
n => rdlen += n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.session.process_new_packets() {
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(e) => {
|
||||||
|
// In case we have an alert to send describing this error,
|
||||||
|
// try a last-gasp write -- but don't predate the primary
|
||||||
|
// error.
|
||||||
|
let _ignored = self.write_tls();
|
||||||
|
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, e));
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
match (eof, until_handshaked, self.session.is_handshaking()) {
|
||||||
|
(_, true, false) => return Ok((rdlen, wrlen)),
|
||||||
|
(_, false, _) => return Ok((rdlen, wrlen)),
|
||||||
|
(true, true, true) => return Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
|
||||||
|
(..) => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S: Session, IO: Read + Write> Read for Stream<'a, S, IO> {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
while self.session.wants_read() {
|
||||||
|
if let (0, 0) = self.complete_io()? {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.session.read(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S: Session, IO: Read + Write> io::Write for Stream<'a, S, IO> {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
let len = self.session.write(buf)?;
|
||||||
|
self.complete_io()?;
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
self.session.flush()?;
|
||||||
|
if self.session.wants_write() {
|
||||||
|
self.complete_io()?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, S: Session, IO: Read + AsyncWrite> CompleteIo<'a, S, IO> for Stream<'a, S, IO> {
|
||||||
|
fn write_tls(&mut self) -> io::Result<usize> {
|
||||||
struct V<'a, IO: 'a>(&'a mut IO);
|
struct V<'a, IO: 'a>(&'a mut IO);
|
||||||
|
|
||||||
impl<'a, IO: AsyncWrite> WriteV for V<'a, IO> {
|
impl<'a, IO: AsyncWrite> WriteV for V<'a, IO> {
|
||||||
@ -41,6 +123,7 @@ impl<'a, S: Session, IO: AsyncWrite> Stream<'a, S, IO> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// TODO test
|
||||||
struct VecBuf<'a, 'b: 'a> {
|
struct VecBuf<'a, 'b: 'a> {
|
||||||
pos: usize,
|
pos: usize,
|
||||||
cur: usize,
|
cur: usize,
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
//! Asynchronous TLS/SSL streams for Tokio using [Rustls](https://github.com/ctz/rustls).
|
//! Asynchronous TLS/SSL streams for Tokio using [Rustls](https://github.com/ctz/rustls).
|
||||||
|
|
||||||
|
#![feature(specialization)]
|
||||||
|
|
||||||
pub extern crate rustls;
|
pub extern crate rustls;
|
||||||
pub extern crate webpki;
|
pub extern crate webpki;
|
||||||
|
|
||||||
@ -18,8 +20,8 @@ use webpki::DNSNameRef;
|
|||||||
use rustls::{
|
use rustls::{
|
||||||
Session, ClientSession, ServerSession,
|
Session, ClientSession, ServerSession,
|
||||||
ClientConfig, ServerConfig,
|
ClientConfig, ServerConfig,
|
||||||
Stream
|
|
||||||
};
|
};
|
||||||
|
use common::Stream;
|
||||||
|
|
||||||
|
|
||||||
/// Extension trait for the `Arc<ClientConfig>` type in the `rustls` crate.
|
/// Extension trait for the `Arc<ClientConfig>` type in the `rustls` crate.
|
||||||
|
@ -2,6 +2,7 @@ use super::*;
|
|||||||
use tokio::prelude::*;
|
use tokio::prelude::*;
|
||||||
use tokio::io::{ AsyncRead, AsyncWrite };
|
use tokio::io::{ AsyncRead, AsyncWrite };
|
||||||
use tokio::prelude::Poll;
|
use tokio::prelude::Poll;
|
||||||
|
use common::{ Stream, CompleteIo };
|
||||||
|
|
||||||
|
|
||||||
impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
|
impl<S: AsyncRead + AsyncWrite> Future for ConnectAsync<S> {
|
||||||
@ -29,18 +30,19 @@ impl<S, C> Future for MidHandshake<S, C>
|
|||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
loop {
|
{
|
||||||
let stream = self.inner.as_mut().unwrap();
|
let stream = self.inner.as_mut().unwrap();
|
||||||
if !stream.session.is_handshaking() { break };
|
if stream.session.is_handshaking() {
|
||||||
|
|
||||||
let (io, session) = stream.get_mut();
|
let (io, session) = stream.get_mut();
|
||||||
|
let mut stream = Stream::new(session, io);
|
||||||
|
|
||||||
match session.complete_io(io) {
|
match stream.complete_io() {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
|
||||||
Err(e) => return Err(e)
|
Err(e) => return Err(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Async::Ready(self.inner.take().unwrap()))
|
Ok(Async::Ready(self.inner.take().unwrap()))
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user