update server example

This commit is contained in:
quininer 2019-05-22 00:54:10 +08:00
parent 7949f4377a
commit 3ffb736d5e
6 changed files with 98 additions and 88 deletions

View File

@ -16,7 +16,7 @@ travis-ci = { repository = "quininer/tokio-rustls" }
appveyor = { repository = "quininer/tokio-rustls" } appveyor = { repository = "quininer/tokio-rustls" }
[dependencies] [dependencies]
smallvec = "*" smallvec = "0.6"
futures = { package = "futures-preview", version = "0.3.0-alpha.16" } futures = { package = "futures-preview", version = "0.3.0-alpha.16" }
rustls = "0.15" rustls = "0.15"
webpki = "0.19" webpki = "0.19"
@ -26,6 +26,5 @@ early-data = []
[dev-dependencies] [dev-dependencies]
romio = "0.3.0-alpha.8" romio = "0.3.0-alpha.8"
tokio = "0.1.6"
lazy_static = "1" lazy_static = "1"
webpki-roots = "0.16" webpki-roots = "0.16"

View File

@ -2,8 +2,10 @@
name = "server" name = "server"
version = "0.1.0" version = "0.1.0"
authors = ["quininer <quininer@live.com>"] authors = ["quininer <quininer@live.com>"]
edition = "2018"
[dependencies] [dependencies]
futures = { package = "futures-preview", version = "0.3.0-alpha.16" }
romio = "0.3.0-alpha.8"
structopt = "*"
tokio-rustls = { path = "../.." } tokio-rustls = { path = "../.." }
tokio = { version = "0.1.6" }
clap = "2"

View File

@ -1,88 +1,100 @@
extern crate clap; #![feature(async_await)]
extern crate tokio;
extern crate tokio_rustls;
use std::fs::File;
use std::sync::Arc; use std::sync::Arc;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::io::BufReader; use std::path::{ PathBuf, Path };
use std::fs::File; use std::io::{ self, BufReader };
use tokio_rustls::{ use structopt::StructOpt;
TlsAcceptor, use futures::task::SpawnExt;
rustls::{ use futures::prelude::*;
Certificate, NoClientAuth, PrivateKey, ServerConfig, use futures::executor;
internal::pemfile::{ certs, rsa_private_keys } use romio::TcpListener;
}, use tokio_rustls::rustls::{ Certificate, NoClientAuth, PrivateKey, ServerConfig };
}; use tokio_rustls::rustls::internal::pemfile::{ certs, rsa_private_keys };
use tokio::prelude::{ Future, Stream }; use tokio_rustls::TlsAcceptor;
use tokio::io::{ self, AsyncRead };
use tokio::net::TcpListener;
use clap::{ App, Arg };
fn app() -> App<'static, 'static> {
App::new("server") #[derive(StructOpt)]
.about("tokio-rustls server example") struct Options {
.arg(Arg::with_name("addr").value_name("ADDR").required(true)) addr: String,
.arg(Arg::with_name("cert").short("c").long("cert").value_name("FILE").help("cert file.").required(true))
.arg(Arg::with_name("key").short("k").long("key").value_name("FILE").help("key file, rsa only.").required(true)) /// cert file
.arg(Arg::with_name("echo").short("e").long("echo-mode").help("echo mode.")) #[structopt(short="c", long="cert", parse(from_os_str))]
cert: PathBuf,
/// key file
#[structopt(short="k", long="key", parse(from_os_str))]
key: PathBuf,
/// echo mode
#[structopt(short="e", long="echo-mode")]
echo: bool
} }
fn load_certs(path: &str) -> Vec<Certificate> { fn load_certs(path: &Path) -> io::Result<Vec<Certificate>> {
certs(&mut BufReader::new(File::open(path).unwrap())).unwrap() certs(&mut BufReader::new(File::open(path)?))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid cert"))
} }
fn load_keys(path: &str) -> Vec<PrivateKey> { fn load_keys(path: &Path) -> io::Result<Vec<PrivateKey>> {
rsa_private_keys(&mut BufReader::new(File::open(path).unwrap())).unwrap() rsa_private_keys(&mut BufReader::new(File::open(path)?))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid key"))
} }
fn main() { fn main() -> io::Result<()> {
let matches = app().get_matches(); let options = Options::from_args();
let addr = matches.value_of("addr").unwrap() let addr = options.addr.to_socket_addrs()?
.to_socket_addrs().unwrap() .next()
.next().unwrap(); .ok_or_else(|| io::Error::from(io::ErrorKind::AddrNotAvailable))?;
let cert_file = matches.value_of("cert").unwrap(); let certs = load_certs(&options.cert)?;
let key_file = matches.value_of("key").unwrap(); let mut keys = load_keys(&options.key)?;
let flag_echo = matches.occurrences_of("echo") > 0; let flag_echo = options.echo;
let mut pool = executor::ThreadPool::new()?;
let mut config = ServerConfig::new(NoClientAuth::new()); let mut config = ServerConfig::new(NoClientAuth::new());
config.set_single_cert(load_certs(cert_file), load_keys(key_file).remove(0)) config.set_single_cert(certs, keys.remove(0))
.expect("invalid key or certificate"); .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
let config = TlsAcceptor::from(Arc::new(config)); let acceptor = TlsAcceptor::from(Arc::new(config));
let socket = TcpListener::bind(&addr).unwrap(); let fut = async {
let done = socket.incoming() let mut listener = TcpListener::bind(&addr)?;
.for_each(move |stream| if flag_echo { let mut incoming = listener.incoming();
let addr = stream.peer_addr().ok();
let done = config.accept(stream)
.and_then(|stream| {
let (reader, writer) = stream.split();
io::copy(reader, writer)
})
.map(move |(n, ..)| println!("Echo: {} - {:?}", n, addr))
.map_err(move |err| println!("Error: {:?} - {:?}", err, addr));
tokio::spawn(done);
Ok(()) while let Some(stream) = incoming.next().await {
let acceptor = acceptor.clone();
let fut = async move {
let stream = stream?;
let peer_addr = stream.peer_addr()?;
let mut stream = acceptor.accept(stream).await?;
if flag_echo {
let (mut reader, mut writer) = stream.split();
let n = reader.copy_into(&mut writer).await?;
println!("Echo: {} - {}", peer_addr, n);
} else { } else {
let addr = stream.peer_addr().ok(); stream.write_all(
let done = config.accept(stream)
.and_then(|stream| io::write_all(
stream,
&b"HTTP/1.0 200 ok\r\n\ &b"HTTP/1.0 200 ok\r\n\
Connection: close\r\n\ Connection: close\r\n\
Content-length: 12\r\n\ Content-length: 12\r\n\
\r\n\ \r\n\
Hello world!"[..] Hello world!"[..]
)) ).await?;
.and_then(|(stream, _)| io::flush(stream)) stream.flush().await?;
.map(move |_| println!("Accept: {:?}", addr)) println!("Hello: {}", peer_addr);
.map_err(move |err| println!("Error: {:?} - {:?}", err, addr)); }
tokio::spawn(done);
Ok(()) as io::Result<()>
};
pool.spawn(fut.unwrap_or_else(|err| eprintln!("{:?}", err))).unwrap();
}
Ok(()) Ok(())
}); };
tokio::run(done.map_err(drop)); executor::block_on(fut)
} }

View File

@ -127,7 +127,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
}; };
match (self.eof, self.session.is_handshaking(), would_block) { match (self.eof, self.session.is_handshaking(), would_block) {
(true, true, _) => return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())), (true, true, _) => {
let err = io::Error::new(io::ErrorKind::UnexpectedEof, "tls handshake eof");
return Poll::Ready(Err(err));
},
(_, false, true) => { (_, false, true) => {
let would_block = match focus { let would_block = match focus {
Focus::Empty => rdlen == 0 && wrlen == 0, Focus::Empty => rdlen == 0 && wrlen == 0,
@ -224,11 +227,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> AsyncWrite for Stream<'
this.session.flush()?; this.session.flush()?;
while this.session.wants_write() { while this.session.wants_write() {
match this.complete_inner_io(cx, Focus::Writable) { try_ready!(this.complete_inner_io(cx, Focus::Writable));
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
}
} }
Pin::new(&mut this.io).poll_flush(cx) Pin::new(&mut this.io).poll_flush(cx)
} }
@ -237,11 +236,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> AsyncWrite for Stream<'
let this = self.get_mut(); let this = self.get_mut();
while this.session.wants_write() { while this.session.wants_write() {
match this.complete_inner_io(cx, Focus::Writable) { try_ready!(this.complete_inner_io(cx, Focus::Writable));
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err))
}
} }
Pin::new(&mut this.io).poll_close(cx) Pin::new(&mut this.io).poll_close(cx)
} }

View File

@ -26,6 +26,9 @@ use rustls::{ ClientConfig, ClientSession, ServerConfig, ServerSession };
use webpki::DNSNameRef; use webpki::DNSNameRef;
use common::Stream; use common::Stream;
pub use rustls;
pub use webpki;
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
enum TlsState { enum TlsState {
#[cfg(feature = "early-data")] #[cfg(feature = "early-data")]

View File

@ -26,7 +26,7 @@ lazy_static!{
let mut config = ServerConfig::new(rustls::NoClientAuth::new()); let mut config = ServerConfig::new(rustls::NoClientAuth::new());
config.set_single_cert(cert, keys.pop().unwrap()) config.set_single_cert(cert, keys.pop().unwrap())
.expect("invalid key or certificate"); .expect("invalid key or certificate");
let config = TlsAcceptor::from(Arc::new(config)); let acceptor = TlsAcceptor::from(Arc::new(config));
let (send, recv) = channel(); let (send, recv) = channel();
@ -40,11 +40,10 @@ lazy_static!{
let mut incoming = listener.incoming(); let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await { while let Some(stream) = incoming.next().await {
let config = config.clone(); let acceptor = acceptor.clone();
pool.spawn( pool.spawn(
async move { async move {
let stream = stream?; let stream = acceptor.accept(stream?).await?;
let stream = config.accept(stream).await?;
let (mut reader, mut write) = stream.split(); let (mut reader, mut write) = stream.split();
reader.copy_into(&mut write).await?; reader.copy_into(&mut write).await?;
Ok(()) as io::Result<()> Ok(()) as io::Result<()>