diff --git a/src/server/channel.rs b/src/server/channel.rs index da4c613e2..92c5be65b 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -5,7 +5,6 @@ use std::net::{SocketAddr, Shutdown}; use bytes::{Bytes, BytesMut, Buf, BufMut}; use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_core::net::TcpStream; use super::{h1, h2, utils, HttpHandler, IoStream}; use super::settings::WorkerSettings; @@ -18,6 +17,7 @@ enum HttpProtocol { H2(h2::Http2), Unknown(Rc>, Option, T, BytesMut), } + impl HttpProtocol { fn is_unknown(&self) -> bool { match *self { @@ -72,8 +72,7 @@ impl HttpChannel where T: IoStream, H: HttpHandler + 'static } } -impl Future for HttpChannel - where T: IoStream, H: HttpHandler + 'static +impl Future for HttpChannel where T: IoStream, H: HttpHandler + 'static { type Item = (); type Error = (); @@ -92,20 +91,15 @@ impl Future for HttpChannel let kind = match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { - match h1.poll() { - Ok(Async::Ready(())) => { + let result = h1.poll(); + match result { + Ok(Async::Ready(())) | Err(_) => { h1.settings().remove_channel(); self.node.as_ref().unwrap().remove(); - return Ok(Async::Ready(())) - } - Ok(Async::NotReady) => - return Ok(Async::NotReady), - Err(_) => { - h1.settings().remove_channel(); - self.node.as_ref().unwrap().remove(); - return Err(()) - } + }, + _ => (), } + return result }, Some(HttpProtocol::H2(ref mut h2)) => { let result = h2.poll(); @@ -113,7 +107,7 @@ impl Future for HttpChannel Ok(Async::Ready(())) | Err(_) => { h2.settings().remove_channel(); self.node.as_ref().unwrap().remove(); - } + }, _ => (), } return result @@ -144,25 +138,22 @@ impl Future for HttpChannel None => unreachable!(), }; - // upgrade to h2 - let proto = self.proto.take().unwrap(); - match proto { - HttpProtocol::Unknown(settings, addr, io, buf) => { - match kind { - ProtocolKind::Http1 => { - self.proto = Some( - HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf))); - self.poll() - }, - ProtocolKind::Http2 => { - self.proto = Some( - HttpProtocol::H2(h2::Http2::new(settings, io, addr, buf.freeze()))); - self.poll() - }, - } + // upgrade to specific http protocol + if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() { + match kind { + ProtocolKind::Http1 => { + self.proto = Some( + HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf))); + return self.poll() + }, + ProtocolKind::Http2 => { + self.proto = Some( + HttpProtocol::H2(h2::Http2::new(settings, io, addr, buf.freeze()))); + return self.poll() + }, } - _ => unreachable!() } + unreachable!() } } @@ -242,67 +233,40 @@ impl Node<()> { } } -impl IoStream for TcpStream { - #[inline] - fn shutdown(&mut self, how: Shutdown) -> io::Result<()> { - TcpStream::shutdown(self, how) - } - - #[inline] - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { - TcpStream::set_nodelay(self, nodelay) - } - - #[inline] - fn set_linger(&mut self, dur: Option) -> io::Result<()> { - TcpStream::set_linger(self, dur) - } -} - - /// Wrapper for `AsyncRead + AsyncWrite` types pub(crate) struct WrapperStream where T: AsyncRead + AsyncWrite + 'static { io: T, } -impl WrapperStream where T: AsyncRead + AsyncWrite + 'static -{ +impl WrapperStream where T: AsyncRead + AsyncWrite + 'static { pub fn new(io: T) -> Self { WrapperStream{io: io} } } -impl IoStream for WrapperStream - where T: AsyncRead + AsyncWrite + 'static -{ +impl IoStream for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn shutdown(&mut self, _: Shutdown) -> io::Result<()> { Ok(()) } - #[inline] fn set_nodelay(&mut self, _: bool) -> io::Result<()> { Ok(()) } - #[inline] fn set_linger(&mut self, _: Option) -> io::Result<()> { Ok(()) } } -impl io::Read for WrapperStream - where T: AsyncRead + AsyncWrite + 'static -{ +impl io::Read for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { self.io.read(buf) } } -impl io::Write for WrapperStream - where T: AsyncRead + AsyncWrite + 'static -{ +impl io::Write for WrapperStream where T: AsyncRead + AsyncWrite + 'static { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { self.io.write(buf) @@ -313,66 +277,20 @@ impl io::Write for WrapperStream } } -impl AsyncRead for WrapperStream - where T: AsyncRead + AsyncWrite + 'static -{ +impl AsyncRead for WrapperStream where T: AsyncRead + AsyncWrite + 'static { + #[inline] fn read_buf(&mut self, buf: &mut B) -> Poll { self.io.read_buf(buf) } } -impl AsyncWrite for WrapperStream - where T: AsyncRead + AsyncWrite + 'static -{ +impl AsyncWrite for WrapperStream where T: AsyncRead + AsyncWrite + 'static { + #[inline] fn shutdown(&mut self) -> Poll<(), io::Error> { self.io.shutdown() } + #[inline] fn write_buf(&mut self, buf: &mut B) -> Poll { self.io.write_buf(buf) } } - - -#[cfg(feature="alpn")] -use tokio_openssl::SslStream; - -#[cfg(feature="alpn")] -impl IoStream for SslStream { - #[inline] - fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { - let _ = self.get_mut().shutdown(); - Ok(()) - } - - #[inline] - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { - self.get_mut().get_mut().set_nodelay(nodelay) - } - - #[inline] - fn set_linger(&mut self, dur: Option) -> io::Result<()> { - self.get_mut().get_mut().set_linger(dur) - } -} - -#[cfg(feature="tls")] -use tokio_tls::TlsStream; - -#[cfg(feature="tls")] -impl IoStream for TlsStream { - #[inline] - fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { - let _ = self.get_mut().shutdown(); - Ok(()) - } - - #[inline] - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { - self.get_mut().get_mut().set_nodelay(nodelay) - } - - #[inline] - fn set_linger(&mut self, dur: Option) -> io::Result<()> { - self.get_mut().get_mut().set_linger(dur) - } -} diff --git a/src/server/mod.rs b/src/server/mod.rs index a62a04ba7..c0a047534 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -4,6 +4,7 @@ use std::net::Shutdown; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::net::TcpStream; mod srv; mod worker; @@ -55,10 +56,10 @@ pub trait HttpHandler: 'static { pub trait HttpHandlerTask { - fn poll_io(&mut self, io: &mut Writer) -> Poll; - fn poll(&mut self) -> Poll<(), Error>; + fn poll_io(&mut self, io: &mut Writer) -> Poll; + fn disconnected(&mut self); } @@ -79,15 +80,6 @@ impl IntoHttpHandler for T { } } -/// Low-level io stream operations -pub trait IoStream: AsyncRead + AsyncWrite + 'static { - fn shutdown(&mut self, how: Shutdown) -> io::Result<()>; - - fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; - - fn set_linger(&mut self, dur: Option) -> io::Result<()>; -} - #[derive(Debug)] pub enum WriterState { Done, @@ -109,3 +101,73 @@ pub trait Writer { fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; } + +/// Low-level io stream operations +pub trait IoStream: AsyncRead + AsyncWrite + 'static { + fn shutdown(&mut self, how: Shutdown) -> io::Result<()>; + + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>; + + fn set_linger(&mut self, dur: Option) -> io::Result<()>; +} + +impl IoStream for TcpStream { + #[inline] + fn shutdown(&mut self, how: Shutdown) -> io::Result<()> { + TcpStream::shutdown(self, how) + } + + #[inline] + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { + TcpStream::set_nodelay(self, nodelay) + } + + #[inline] + fn set_linger(&mut self, dur: Option) -> io::Result<()> { + TcpStream::set_linger(self, dur) + } +} + +#[cfg(feature="alpn")] +use tokio_openssl::SslStream; + +#[cfg(feature="alpn")] +impl IoStream for SslStream { + #[inline] + fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { + let _ = self.get_mut().shutdown(); + Ok(()) + } + + #[inline] + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { + self.get_mut().get_mut().set_nodelay(nodelay) + } + + #[inline] + fn set_linger(&mut self, dur: Option) -> io::Result<()> { + self.get_mut().get_mut().set_linger(dur) + } +} + +#[cfg(feature="tls")] +use tokio_tls::TlsStream; + +#[cfg(feature="tls")] +impl IoStream for TlsStream { + #[inline] + fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> { + let _ = self.get_mut().shutdown(); + Ok(()) + } + + #[inline] + fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { + self.get_mut().get_mut().set_nodelay(nodelay) + } + + #[inline] + fn set_linger(&mut self, dur: Option) -> io::Result<()> { + self.get_mut().get_mut().set_linger(dur) + } +}