tokio-rustls: Add to README and clean code (#15)

* tokio-rustls: Add to README and clean code

* cargo fmt
This commit is contained in:
quininer 2020-05-20 13:09:24 +08:00 committed by GitHub
parent 3be701cefb
commit fc90b3f378
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 44 deletions

View File

@ -5,6 +5,7 @@
This crate contains a collection of Tokio based TLS libraries. This crate contains a collection of Tokio based TLS libraries.
- [`tokio-native-tls`](tokio-native-tls) - [`tokio-native-tls`](tokio-native-tls)
- [`tokio-rustls`](tokio-rustls)
## Getting Help ## Getting Help

View File

@ -47,38 +47,39 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
if let MidHandshake::Handshaking(mut stream) = mem::replace(this, MidHandshake::End) { let mut stream =
if !stream.skip_handshake() { if let MidHandshake::Handshaking(stream) = mem::replace(this, MidHandshake::End) {
let (state, io, session) = stream.get_mut(); stream
let mut tls_stream = Stream::new(io, session).set_eof(!state.readable()); } else {
panic!("unexpected polling after handshake")
};
macro_rules! try_poll { if !stream.skip_handshake() {
( $e:expr ) => { let (state, io, session) = stream.get_mut();
match $e { let mut tls_stream = Stream::new(io, session).set_eof(!state.readable());
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => { macro_rules! try_poll {
return Poll::Ready(Err((err, stream.into_io()))) ( $e:expr ) => {
} match $e {
Poll::Pending => { Poll::Ready(Ok(_)) => (),
*this = MidHandshake::Handshaking(stream); Poll::Ready(Err(err)) => return Poll::Ready(Err((err, stream.into_io()))),
return Poll::Pending; Poll::Pending => {
} *this = MidHandshake::Handshaking(stream);
return Poll::Pending;
} }
}; }
} };
while tls_stream.session.is_handshaking() {
try_poll!(tls_stream.handshake(cx));
}
while tls_stream.session.wants_write() {
try_poll!(tls_stream.write_io(cx));
}
} }
Poll::Ready(Ok(stream)) while tls_stream.session.is_handshaking() {
} else { try_poll!(tls_stream.handshake(cx));
panic!("unexpected polling after handshake") }
while tls_stream.session.wants_write() {
try_poll!(tls_stream.write_io(cx));
}
} }
Poll::Ready(Ok(stream))
} }
} }

View File

@ -96,17 +96,6 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
Pin::new(self) Pin::new(self)
} }
pub fn process_new_packets(&mut self, cx: &mut Context) -> io::Result<()> {
self.session.process_new_packets().map_err(|err| {
// In case we have an alert to send describing this error,
// try a last-gasp write -- but don't predate the primary
// error.
let _ = self.write_io(cx);
io::Error::new(io::ErrorKind::InvalidData, err)
})
}
pub fn read_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> { pub fn read_io(&mut self, cx: &mut Context) -> Poll<io::Result<usize>> {
struct Reader<'a, 'b, T> { struct Reader<'a, 'b, T> {
io: &'a mut T, io: &'a mut T,
@ -130,6 +119,15 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
Err(err) => return Poll::Ready(Err(err)), Err(err) => return Poll::Ready(Err(err)),
}; };
self.session.process_new_packets().map_err(|err| {
// In case we have an alert to send describing this error,
// try a last-gasp write -- but don't predate the primary
// error.
let _ = self.write_io(cx);
io::Error::new(io::ErrorKind::InvalidData, err)
})?;
Poll::Ready(Ok(n)) Poll::Ready(Ok(n))
} }
@ -218,10 +216,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> Stream<'a, IO, S> {
while !self.eof && self.session.wants_read() { while !self.eof && self.session.wants_read() {
match self.read_io(cx) { match self.read_io(cx) {
Poll::Ready(Ok(0)) => self.eof = true, Poll::Ready(Ok(0)) => self.eof = true,
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => rdlen += n,
rdlen += n;
self.process_new_packets(cx)?;
}
Poll::Pending => { Poll::Pending => {
read_would_block = true; read_would_block = true;
break; break;
@ -267,7 +262,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin, S: Session> AsyncRead for Stream<'a
self.eof = true; self.eof = true;
break; break;
} }
Poll::Ready(Ok(_)) => self.process_new_packets(cx)?, Poll::Ready(Ok(_)) => (),
Poll::Pending => { Poll::Pending => {
would_block = true; would_block = true;
break; break;