From 2f4419b285142730f4af7a13132cccf88356f189 Mon Sep 17 00:00:00 2001 From: quininer Date: Sat, 13 Jul 2019 17:34:52 +0800 Subject: [PATCH] Switch to tokio-io 0.2 --- Cargo.toml | 5 +++-- src/client.rs | 8 ++++---- src/common/mod.rs | 41 ++++++++++++++++++++++------------------- src/lib.rs | 3 ++- src/server.rs | 8 ++++---- 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 84d6d63..f92dbef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,8 @@ appveyor = { repository = "quininer/tokio-rustls" } [dependencies] smallvec = "0.6" -futures = { package = "futures-preview", version = "0.3.0-alpha.16" } +tokio-io = { git = "https://github.com/tokio-rs/tokio" } +tokio-futures = { git = "https://github.com/tokio-rs/tokio" } rustls = "0.15" webpki = "0.19" @@ -25,6 +26,6 @@ webpki = "0.19" early-data = [] [dev-dependencies] -romio = "0.3.0-alpha.8" +tokio = { git = "https://github.com/tokio-rs/tokio" } lazy_static = "1" webpki-roots = "0.16" diff --git a/src/client.rs b/src/client.rs index 11a0331..ac961b2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -74,8 +74,8 @@ impl AsyncRead for TlsStream where IO: AsyncRead + AsyncWrite + Unpin, { - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.io.prepare_uninitialized_buffer(buf) } fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { @@ -192,7 +192,7 @@ where stream.as_mut_pin().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.state.writeable() { self.session.send_close_notify(); self.state.shutdown_write(); @@ -201,6 +201,6 @@ where let this = self.get_mut(); let mut stream = Stream::new(&mut this.io, &mut this.session) .set_eof(!this.state.readable()); - stream.as_mut_pin().poll_close(cx) + stream.as_mut_pin().poll_shutdown(cx) } } diff --git a/src/common/mod.rs b/src/common/mod.rs index 2e648ae..228b1a3 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,11 +1,10 @@ use std::pin::Pin; -use std::task::Poll; +use std::task::{ Poll, Context }; use std::marker::Unpin; -use std::io::{ self, Read }; -use rustls::{ Session, WriteV }; -use futures::task::Context; -use futures::io::{ AsyncRead, AsyncWrite, IoSlice }; -use smallvec::SmallVec; +use std::io::{ self, Read, Write }; +use rustls::Session; +use tokio_io::{ AsyncRead, AsyncWrite }; +use tokio_futures as futures; pub struct Stream<'a, IO, S> { @@ -154,27 +153,31 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> WriteTls for Stream<'a, IO, S> { fn write_tls(&mut self, cx: &mut Context) -> io::Result { - struct Writer<'a, 'b, IO> { - io: &'a mut IO, + // TODO writev + + struct Writer<'a, 'b, T> { + io: &'a mut T, cx: &'a mut Context<'b> } - impl<'a, 'b, IO: AsyncWrite + Unpin> WriteV for Writer<'a, 'b, IO> { - fn writev(&mut self, vbytes: &[&[u8]]) -> io::Result { - let vbytes = vbytes - .into_iter() - .map(|v| IoSlice::new(v)) - .collect::; 64]>>(); + impl<'a, 'b, T: AsyncWrite + Unpin> Write for Writer<'a, 'b, T> { + fn write(&mut self, buf: &[u8]) -> io::Result { + match Pin::new(&mut self.io).poll_write(self.cx, buf) { + Poll::Ready(result) => result, + Poll::Pending => Err(io::ErrorKind::WouldBlock.into()) + } + } - match Pin::new(&mut self.io).poll_write_vectored(self.cx, &vbytes) { + fn flush(&mut self) -> io::Result<()> { + match Pin::new(&mut self.io).poll_flush(self.cx) { Poll::Ready(result) => result, Poll::Pending => Err(io::ErrorKind::WouldBlock.into()) } } } - let mut vecio = Writer { io: self.io, cx }; - self.session.writev_tls(&mut vecio) + let mut writer = Writer { io: self.io, cx }; + self.session.write_tls(&mut writer) } } @@ -240,13 +243,13 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> AsyncWrite for Stream<' Pin::new(&mut this.io).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); while this.session.wants_write() { futures::ready!(this.complete_inner_io(cx, Focus::Writable))?; } - Pin::new(&mut this.io).poll_close(cx) + Pin::new(&mut this.io).poll_shutdown(cx) } } diff --git a/src/lib.rs b/src/lib.rs index 65c11f3..59cfe44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,8 @@ use std::sync::Arc; use std::pin::Pin; use std::future::Future; use std::task::{ Poll, Context }; -use futures::io::{ AsyncRead, AsyncWrite, Initializer }; +use tokio_io::{ AsyncRead, AsyncWrite }; +use tokio_futures as futures; use rustls::{ ClientConfig, ClientSession, ServerConfig, ServerSession }; use webpki::DNSNameRef; use common::Stream; diff --git a/src/server.rs b/src/server.rs index 1e25145..6a94347 100644 --- a/src/server.rs +++ b/src/server.rs @@ -67,8 +67,8 @@ impl AsyncRead for TlsStream where IO: AsyncRead + AsyncWrite + Unpin, { - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.io.prepare_uninitialized_buffer(buf) } fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { @@ -119,7 +119,7 @@ where stream.as_mut_pin().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.state.writeable() { self.session.send_close_notify(); self.state.shutdown_write(); @@ -128,6 +128,6 @@ where let this = self.get_mut(); let mut stream = Stream::new(&mut this.io, &mut this.session) .set_eof(!this.state.readable()); - stream.as_mut_pin().poll_close(cx) + stream.as_mut_pin().poll_shutdown(cx) } }