From 61b2f5b3bc7b06a0f1b6985af86dd2160411b915 Mon Sep 17 00:00:00 2001 From: Gleb Pomykalov Date: Wed, 27 Nov 2019 01:37:00 +0300 Subject: [PATCH] Migrate to tokio 0.2 and futures 0.3 --- Cargo.toml | 8 ++++---- src/client.rs | 2 +- src/common/mod.rs | 2 +- src/common/test_stream.rs | 2 +- src/lib.rs | 2 +- src/server.rs | 2 +- tests/test.rs | 35 +++++++++++++++++++---------------- 7 files changed, 28 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index daec434..3bc10f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ edition = "2018" github-actions = { repository = "quininer/tokio-rustls", workflow = "ci" } [dependencies] -tokio-io = "=0.2.0-alpha.6" -futures-core-preview = "=0.3.0-alpha.19" +tokio = "0.2.0" +futures-core = "0.3.1" rustls = "0.16" webpki = "0.21" @@ -25,7 +25,7 @@ early-data = [] dangerous_configuration = ["rustls/dangerous_configuration"] [dev-dependencies] -tokio = "=0.2.0-alpha.6" -futures-util-preview = "0.3.0-alpha.19" +tokio = { version = "0.2.0", features = ["macros", "net", "io-util", "rt-core", "time"] } +futures-util = "0.3.1" lazy_static = "1" webpki-roots = "0.18" diff --git a/src/client.rs b/src/client.rs index 8671776..470194a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -69,7 +69,7 @@ impl AsyncRead for TlsStream where IO: AsyncRead + AsyncWrite + Unpin, { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { self.io.prepare_uninitialized_buffer(buf) } diff --git a/src/common/mod.rs b/src/common/mod.rs index 3083534..1deb21d 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -3,7 +3,7 @@ use std::task::{ Poll, Context }; use std::marker::Unpin; use std::io::{ self, Read, Write }; use rustls::Session; -use tokio_io::{ AsyncRead, AsyncWrite }; +use tokio::io::{ AsyncRead, AsyncWrite }; use futures_core as futures; diff --git a/src/common/test_stream.rs b/src/common/test_stream.rs index d706e37..0055014 100644 --- a/src/common/test_stream.rs +++ b/src/common/test_stream.rs @@ -4,7 +4,7 @@ use std::task::{ Poll, Context }; use futures_core::ready; use futures_util::future::poll_fn; use futures_util::task::noop_waker_ref; -use tokio_io::{ AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt }; +use tokio::io::{ AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt }; use std::io::{ self, Read, Write, BufReader, Cursor }; use webpki::DNSNameRef; use rustls::internal::pemfile::{ certs, rsa_private_keys }; diff --git a/src/lib.rs b/src/lib.rs index 8caf1e5..706e972 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use std::future::Future; use std::task::{ Context, Poll }; use futures_core as futures; -use tokio_io::{ AsyncRead, AsyncWrite }; +use tokio::io::{ AsyncRead, AsyncWrite }; use webpki::DNSNameRef; use rustls::{ ClientConfig, ClientSession, ServerConfig, ServerSession, Session }; use common::Stream; diff --git a/src/server.rs b/src/server.rs index 87ce3f8..9066c27 100644 --- a/src/server.rs +++ b/src/server.rs @@ -67,7 +67,7 @@ impl AsyncRead for TlsStream where IO: AsyncRead + AsyncWrite + Unpin, { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [std::mem::MaybeUninit]) -> bool { self.io.prepare_uninitialized_buffer(buf) } diff --git a/tests/test.rs b/tests/test.rs index 30f8e8a..d0d30f4 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -3,12 +3,12 @@ use std::io::{ BufReader, Cursor }; use std::sync::Arc; use std::sync::mpsc::channel; use std::net::SocketAddr; +use futures_util::future::TryFutureExt; use lazy_static::lazy_static; use tokio::prelude::*; -use tokio::runtime::current_thread; +use tokio::runtime; use tokio::net::{ TcpListener, TcpStream }; -use tokio::io::split; -use futures_util::try_future::TryFutureExt; +use tokio::io::{copy, split}; use rustls::{ ServerConfig, ClientConfig }; use rustls::internal::pemfile::{ certs, rsa_private_keys }; use tokio_rustls::{ TlsConnector, TlsAcceptor }; @@ -30,32 +30,36 @@ lazy_static!{ let (send, recv) = channel(); thread::spawn(move || { - let mut runtime = current_thread::Runtime::new().unwrap(); - let handle = runtime.handle(); + let mut runtime = runtime::Builder::new() + .basic_scheduler() + .enable_io() + .build() + .unwrap(); + + let handle = runtime.handle().clone(); let done = async move { let addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let listener = TcpListener::bind(&addr).await?; + let mut listener = TcpListener::bind(&addr).await?; send.send(listener.local_addr()?).unwrap(); - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { + loop { + let (stream, _) = listener.accept().await?; + let acceptor = acceptor.clone(); let fut = async move { - let stream = acceptor.accept(stream?).await?; + let stream = acceptor.accept(stream).await?; let (mut reader, mut writer) = split(stream); - reader.copy(&mut writer).await?; + copy(&mut reader, &mut writer).await?; Ok(()) as io::Result<()> }.unwrap_or_else(|err| eprintln!("server: {:?}", err)); - handle.spawn(fut).unwrap(); + handle.spawn(fut); } - - Ok(()) as io::Result<()> - }.unwrap_or_else(|err| eprintln!("server: {:?}", err)); + }.unwrap_or_else(|err: io::Error| eprintln!("server: {:?}", err)); runtime.block_on(done); }); @@ -95,8 +99,7 @@ async fn pass() -> io::Result<()> { // TcpStream::bind now returns a future it creates a race // condition until its ready sometimes. use std::time::*; - let deadline = Instant::now() + Duration::from_secs(1); - tokio::timer::delay(deadline); + tokio::time::delay_for(Duration::from_secs(1)).await; let mut config = ClientConfig::new(); let mut chain = BufReader::new(Cursor::new(chain));