1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

cleanup http channel

This commit is contained in:
Nikolay Kim 2018-01-11 22:06:06 -08:00
parent e482b88741
commit e919ec485e
2 changed files with 105 additions and 125 deletions

View File

@ -5,7 +5,6 @@ use std::net::{SocketAddr, Shutdown};
use bytes::{Bytes, BytesMut, Buf, BufMut}; use bytes::{Bytes, BytesMut, Buf, BufMut};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
use super::{h1, h2, utils, HttpHandler, IoStream}; use super::{h1, h2, utils, HttpHandler, IoStream};
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
@ -18,6 +17,7 @@ enum HttpProtocol<T: IoStream, H: 'static> {
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut), Unknown(Rc<WorkerSettings<H>>, Option<SocketAddr>, T, BytesMut),
} }
impl<T: IoStream, H: 'static> HttpProtocol<T, H> { impl<T: IoStream, H: 'static> HttpProtocol<T, H> {
fn is_unknown(&self) -> bool { fn is_unknown(&self) -> bool {
match *self { match *self {
@ -72,8 +72,7 @@ impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
} }
} }
impl<T, H> Future for HttpChannel<T, H> impl<T, H> Future for HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
where T: IoStream, H: HttpHandler + 'static
{ {
type Item = (); type Item = ();
type Error = (); type Error = ();
@ -92,20 +91,15 @@ impl<T, H> Future for HttpChannel<T, H>
let kind = match self.proto { let kind = match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
match h1.poll() { let result = h1.poll();
Ok(Async::Ready(())) => { match result {
Ok(Async::Ready(())) | Err(_) => {
h1.settings().remove_channel(); h1.settings().remove_channel();
self.node.as_ref().unwrap().remove(); self.node.as_ref().unwrap().remove();
return Ok(Async::Ready(())) },
} _ => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(_) => {
h1.settings().remove_channel();
self.node.as_ref().unwrap().remove();
return Err(())
}
} }
return result
}, },
Some(HttpProtocol::H2(ref mut h2)) => { Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll(); let result = h2.poll();
@ -113,7 +107,7 @@ impl<T, H> Future for HttpChannel<T, H>
Ok(Async::Ready(())) | Err(_) => { Ok(Async::Ready(())) | Err(_) => {
h2.settings().remove_channel(); h2.settings().remove_channel();
self.node.as_ref().unwrap().remove(); self.node.as_ref().unwrap().remove();
} },
_ => (), _ => (),
} }
return result return result
@ -144,25 +138,22 @@ impl<T, H> Future for HttpChannel<T, H>
None => unreachable!(), None => unreachable!(),
}; };
// upgrade to h2 // upgrade to specific http protocol
let proto = self.proto.take().unwrap(); if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
match proto { match kind {
HttpProtocol::Unknown(settings, addr, io, buf) => { ProtocolKind::Http1 => {
match kind { self.proto = Some(
ProtocolKind::Http1 => { HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf)));
self.proto = Some( return self.poll()
HttpProtocol::H1(h1::Http1::new(settings, io, addr, buf))); },
self.poll() ProtocolKind::Http2 => {
}, self.proto = Some(
ProtocolKind::Http2 => { HttpProtocol::H2(h2::Http2::new(settings, io, addr, buf.freeze())));
self.proto = Some( return self.poll()
HttpProtocol::H2(h2::Http2::new(settings, io, addr, buf.freeze()))); },
self.poll()
},
}
} }
_ => unreachable!()
} }
unreachable!()
} }
} }
@ -242,67 +233,40 @@ impl Node<()> {
} }
} }
impl IoStream for TcpStream {
#[inline]
fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
TcpStream::shutdown(self, how)
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
TcpStream::set_nodelay(self, nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
TcpStream::set_linger(self, dur)
}
}
/// Wrapper for `AsyncRead + AsyncWrite` types /// Wrapper for `AsyncRead + AsyncWrite` types
pub(crate) struct WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static { pub(crate) struct WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
io: T, io: T,
} }
impl<T> WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static impl<T> WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
{
pub fn new(io: T) -> Self { pub fn new(io: T) -> Self {
WrapperStream{io: io} WrapperStream{io: io}
} }
} }
impl<T> IoStream for WrapperStream<T> impl<T> IoStream for WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
where T: AsyncRead + AsyncWrite + 'static
{
#[inline] #[inline]
fn shutdown(&mut self, _: Shutdown) -> io::Result<()> { fn shutdown(&mut self, _: Shutdown) -> io::Result<()> {
Ok(()) Ok(())
} }
#[inline] #[inline]
fn set_nodelay(&mut self, _: bool) -> io::Result<()> { fn set_nodelay(&mut self, _: bool) -> io::Result<()> {
Ok(()) Ok(())
} }
#[inline] #[inline]
fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(()) Ok(())
} }
} }
impl<T> io::Read for WrapperStream<T> impl<T> io::Read for WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
where T: AsyncRead + AsyncWrite + 'static
{
#[inline] #[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf) self.io.read(buf)
} }
} }
impl<T> io::Write for WrapperStream<T> impl<T> io::Write for WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
where T: AsyncRead + AsyncWrite + 'static
{
#[inline] #[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf) self.io.write(buf)
@ -313,66 +277,20 @@ impl<T> io::Write for WrapperStream<T>
} }
} }
impl<T> AsyncRead for WrapperStream<T> impl<T> AsyncRead for WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
where T: AsyncRead + AsyncWrite + 'static #[inline]
{
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.read_buf(buf) self.io.read_buf(buf)
} }
} }
impl<T> AsyncWrite for WrapperStream<T> impl<T> AsyncWrite for WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
where T: AsyncRead + AsyncWrite + 'static #[inline]
{
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {
self.io.shutdown() self.io.shutdown()
} }
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.write_buf(buf) self.io.write_buf(buf)
} }
} }
#[cfg(feature="alpn")]
use tokio_openssl::SslStream;
#[cfg(feature="alpn")]
impl IoStream for SslStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}
#[cfg(feature="tls")]
use tokio_tls::TlsStream;
#[cfg(feature="tls")]
impl IoStream for TlsStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}

View File

@ -4,6 +4,7 @@ use std::net::Shutdown;
use futures::Poll; use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
mod srv; mod srv;
mod worker; mod worker;
@ -55,10 +56,10 @@ pub trait HttpHandler: 'static {
pub trait HttpHandlerTask { pub trait HttpHandlerTask {
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error>;
fn poll(&mut self) -> Poll<(), Error>; fn poll(&mut self) -> Poll<(), Error>;
fn poll_io(&mut self, io: &mut Writer) -> Poll<bool, Error>;
fn disconnected(&mut self); fn disconnected(&mut self);
} }
@ -79,15 +80,6 @@ impl<T: HttpHandler> IntoHttpHandler for T {
} }
} }
/// Low-level io stream operations
pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
}
#[derive(Debug)] #[derive(Debug)]
pub enum WriterState { pub enum WriterState {
Done, Done,
@ -109,3 +101,73 @@ pub trait Writer {
fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>; fn poll_completed(&mut self, shutdown: bool) -> Poll<(), io::Error>;
} }
/// Low-level io stream operations
pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
}
impl IoStream for TcpStream {
#[inline]
fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
TcpStream::shutdown(self, how)
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
TcpStream::set_nodelay(self, nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
TcpStream::set_linger(self, dur)
}
}
#[cfg(feature="alpn")]
use tokio_openssl::SslStream;
#[cfg(feature="alpn")]
impl IoStream for SslStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}
#[cfg(feature="tls")]
use tokio_tls::TlsStream;
#[cfg(feature="tls")]
impl IoStream for TlsStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}