1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

introduce IoStream trait for low level stream operations

This commit is contained in:
Nikolay Kim 2018-01-03 23:41:55 -08:00
parent 1f7aee23df
commit 9559f6a175
4 changed files with 201 additions and 56 deletions

View File

@ -1,8 +1,8 @@
use std::{ptr, mem, time}; use std::{ptr, mem, time, io};
use std::rc::Rc; use std::rc::Rc;
use std::net::{SocketAddr, Shutdown}; use std::net::{SocketAddr, Shutdown};
use bytes::Bytes; use bytes::{Bytes, Buf, BufMut};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
@ -48,8 +48,7 @@ impl<T: HttpHandler> IntoHttpHandler for T {
} }
} }
enum HttpProtocol<T, H> enum HttpProtocol<T: IoStream, H: 'static>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{ {
H1(h1::Http1<T, H>), H1(h1::Http1<T, H>),
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
@ -57,22 +56,14 @@ enum HttpProtocol<T, H>
#[doc(hidden)] #[doc(hidden)]
pub struct HttpChannel<T, H> pub struct HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: IoStream, H: HttpHandler + 'static
{ {
proto: Option<HttpProtocol<T, H>>, proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>, node: Option<Node<HttpChannel<T, H>>>,
} }
impl<T, H> Drop for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
fn drop(&mut self) {
self.shutdown()
}
}
impl<T, H> HttpChannel<T, H> impl<T, H> HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: IoStream, H: HttpHandler + 'static
{ {
pub(crate) fn new(h: Rc<WorkerSettings<H>>, pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H> io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
@ -91,19 +82,12 @@ impl<T, H> HttpChannel<T, H>
} }
} }
fn io(&mut self) -> Option<&mut T> {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
Some(h1.io())
}
_ => None,
}
}
fn shutdown(&mut self) { fn shutdown(&mut self) {
match self.proto { match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
let _ = h1.io().shutdown(); let io = h1.io();
let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = IoStream::shutdown(io, Shutdown::Both);
} }
Some(HttpProtocol::H2(ref mut h2)) => { Some(HttpProtocol::H2(ref mut h2)) => {
h2.shutdown() h2.shutdown()
@ -122,7 +106,7 @@ impl<T, H> HttpChannel<T, H>
}*/ }*/
impl<T, H> Future for HttpChannel<T, H> impl<T, H> Future for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: IoStream, H: HttpHandler + 'static
{ {
type Item = (); type Item = ();
type Error = (); type Error = ();
@ -242,7 +226,7 @@ impl Node<()> {
} }
} }
pub(crate) fn traverse<H>(&self) where H: HttpHandler + 'static { pub(crate) fn traverse<T, H>(&self) where T: IoStream, H: HttpHandler + 'static {
let mut next = self.next.as_ref(); let mut next = self.next.as_ref();
loop { loop {
if let Some(n) = next { if let Some(n) = next {
@ -251,13 +235,8 @@ impl Node<()> {
next = n.next.as_ref(); next = n.next.as_ref();
if !n.element.is_null() { if !n.element.is_null() {
let ch: &mut HttpChannel<TcpStream, H> = mem::transmute( let ch: &mut HttpChannel<T, H> = mem::transmute(
&mut *(n.element as *mut _)); &mut *(n.element as *mut _));
if let Some(io) = ch.io() {
let _ = TcpStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = TcpStream::shutdown(io, Shutdown::Both);
continue;
}
ch.shutdown(); ch.shutdown();
} }
} }
@ -267,3 +246,146 @@ impl Node<()> {
} }
} }
} }
pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn shutdown(&mut self, how: Shutdown) -> io::Result<()>;
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()>;
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
}
impl IoStream for TcpStream {
#[inline]
fn shutdown(&mut self, how: Shutdown) -> io::Result<()> {
TcpStream::shutdown(self, how)
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
TcpStream::set_nodelay(self, nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
TcpStream::set_linger(self, dur)
}
}
pub(crate) struct WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static {
io: T,
}
impl<T> WrapperStream<T> where T: AsyncRead + AsyncWrite + 'static
{
pub fn new(io: T) -> Self {
WrapperStream{io: io}
}
}
impl<T> IoStream for WrapperStream<T>
where T: AsyncRead + AsyncWrite + 'static
{
#[inline]
fn shutdown(&mut self, _: Shutdown) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_nodelay(&mut self, _: bool) -> io::Result<()> {
Ok(())
}
#[inline]
fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
}
impl<T> io::Read for WrapperStream<T>
where T: AsyncRead + AsyncWrite + 'static
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl<T> io::Write for WrapperStream<T>
where T: AsyncRead + AsyncWrite + 'static
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl<T> AsyncRead for WrapperStream<T>
where T: AsyncRead + AsyncWrite + 'static
{
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.read_buf(buf)
}
}
impl<T> AsyncWrite for WrapperStream<T>
where T: AsyncRead + AsyncWrite + 'static
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.io.shutdown()
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.io.write_buf(buf)
}
}
#[cfg(feature="alpn")]
use tokio_openssl::SslStream;
#[cfg(feature="alpn")]
impl IoStream for SslStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}
#[cfg(feature="tls")]
use tokio_tls::TlsStream;
#[cfg(feature="tls")]
impl IoStream for TlsStream<TcpStream> {
#[inline]
fn shutdown(&mut self, _how: Shutdown) -> io::Result<()> {
let _ = self.get_mut().shutdown();
Ok(())
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.get_mut().get_mut().set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur)
}
}

View File

@ -10,12 +10,11 @@ use http::{Uri, Method, Version, HttpTryFrom, HeaderMap};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{Bytes, BytesMut, BufMut};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Timeout; use tokio_core::reactor::Timeout;
use pipeline::Pipeline; use pipeline::Pipeline;
use encoding::PayloadType; use encoding::PayloadType;
use channel::{HttpHandler, HttpHandlerTask}; use channel::{HttpHandler, HttpHandlerTask, IoStream};
use h1writer::{Writer, H1Writer}; use h1writer::{Writer, H1Writer};
use worker::WorkerSettings; use worker::WorkerSettings;
use httpcodes::HTTPNotFound; use httpcodes::HTTPNotFound;
@ -57,7 +56,7 @@ enum Item {
Http2, Http2,
} }
pub(crate) struct Http1<T: AsyncWrite + 'static, H: 'static> { pub(crate) struct Http1<T: IoStream, H: 'static> {
flags: Flags, flags: Flags,
settings: Rc<WorkerSettings<H>>, settings: Rc<WorkerSettings<H>>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
@ -74,8 +73,7 @@ struct Entry {
} }
impl<T, H> Http1<T, H> impl<T, H> Http1<T, H>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream, H: HttpHandler + 'static
H: HttpHandler + 'static
{ {
pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>) -> Self { pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
let bytes = h.get_shared_bytes(); let bytes = h.get_shared_bytes();
@ -417,7 +415,7 @@ impl Reader {
pub fn parse<T, H>(&mut self, io: &mut T, pub fn parse<T, H>(&mut self, io: &mut T,
buf: &mut BytesMut, buf: &mut BytesMut,
settings: &WorkerSettings<H>) -> Poll<Item, ReaderError> settings: &WorkerSettings<H>) -> Poll<Item, ReaderError>
where T: AsyncRead where T: IoStream
{ {
// read payload // read payload
if self.payload.is_some() { if self.payload.is_some() {
@ -507,8 +505,8 @@ impl Reader {
} }
} }
fn read_from_io<T: AsyncRead>(&mut self, io: &mut T, buf: &mut BytesMut) fn read_from_io<T: IoStream>(&mut self, io: &mut T, buf: &mut BytesMut)
-> Poll<usize, io::Error> -> Poll<usize, io::Error>
{ {
unsafe { unsafe {
if buf.remaining_mut() < LW_BUFFER_SIZE { if buf.remaining_mut() < LW_BUFFER_SIZE {
@ -894,14 +892,17 @@ impl ChunkedState {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{io, cmp}; use std::{io, cmp, time};
use bytes::{Bytes, BytesMut}; use std::net::Shutdown;
use futures::{Async}; use bytes::{Bytes, BytesMut, Buf};
use tokio_io::AsyncRead; use futures::Async;
use tokio_io::{AsyncRead, AsyncWrite};
use http::{Version, Method}; use http::{Version, Method};
use super::*; use super::*;
use application::HttpApplication; use application::HttpApplication;
use worker::WorkerSettings; use worker::WorkerSettings;
use channel::IoStream;
struct Buffer { struct Buffer {
buf: Bytes, buf: Bytes,
@ -940,6 +941,28 @@ mod tests {
} }
} }
impl IoStream for Buffer {
fn shutdown(&self, _: Shutdown) -> io::Result<()> {
Ok(())
}
fn set_nodelay(&self, _: bool) -> io::Result<()> {
Ok(())
}
fn set_linger(&self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
}
impl io::Write for Buffer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {Ok(buf.len())}
fn flush(&mut self) -> io::Result<()> {Ok(())}
}
impl AsyncWrite for Buffer {
fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(Async::Ready(())) }
fn write_buf<B: Buf>(&mut self, _: &mut B) -> Poll<usize, io::Error> {
Ok(Async::NotReady)
}
}
macro_rules! not_ready { macro_rules! not_ready {
($e:expr) => (match $e { ($e:expr) => (match $e {
Ok(Async::NotReady) => (), Ok(Async::NotReady) => (),

View File

@ -31,7 +31,7 @@ use tokio_openssl::SslStream;
use actix::actors::signal; use actix::actors::signal;
use helpers; use helpers;
use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; use channel::{HttpChannel, HttpHandler, IntoHttpHandler, IoStream, WrapperStream};
use worker::{Conn, Worker, WorkerSettings, StreamHandlerType, StopWorker}; use worker::{Conn, Worker, WorkerSettings, StreamHandlerType, StopWorker};
/// Various server settings /// Various server settings
@ -131,7 +131,7 @@ impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> HttpServer<T
impl<T, A, H, U, V> HttpServer<T, A, H, U> impl<T, A, H, U, V> HttpServer<T, A, H, U>
where A: 'static, where A: 'static,
T: AsyncRead + AsyncWrite + 'static, T: IoStream,
H: HttpHandler, H: HttpHandler,
U: IntoIterator<Item=V> + 'static, U: IntoIterator<Item=V> + 'static,
V: IntoHttpHandler<Handler=H>, V: IntoHttpHandler<Handler=H>,
@ -450,7 +450,7 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
} }
} }
impl<T, A, H, U, V> HttpServer<T, A, H, U> impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
where A: 'static, where A: 'static,
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
H: HttpHandler, H: HttpHandler,
@ -488,7 +488,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
// start server // start server
HttpServer::create(move |ctx| { HttpServer::create(move |ctx| {
ctx.add_stream(stream.map( ctx.add_stream(stream.map(
move |(t, _)| Conn{io: t, peer: None, http2: false})); move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false}));
self self
}) })
} }
@ -499,7 +499,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and send `SystemExit(0)` /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and send `SystemExit(0)`
/// message to `System` actor. /// message to `System` actor.
impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static, A: 'static,
@ -530,13 +530,13 @@ impl<T, A, H, U> Handler<signal::Signal> for HttpServer<T, A, H, U>
} }
impl<T, A, H, U> StreamHandler<Conn<T>, io::Error> for HttpServer<T, A, H, U> impl<T, A, H, U> StreamHandler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static {} A: 'static {}
impl<T, A, H, U> Handler<Conn<T>, io::Error> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<Conn<T>, io::Error> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static, A: 'static,
@ -573,7 +573,7 @@ pub struct StopServer {
} }
impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static, A: 'static,
@ -589,7 +589,7 @@ impl<T, A, H, U> Handler<PauseServer> for HttpServer<T, A, H, U>
} }
impl<T, A, H, U> Handler<ResumeServer> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<ResumeServer> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static, A: 'static,
@ -605,7 +605,7 @@ impl<T, A, H, U> Handler<ResumeServer> for HttpServer<T, A, H, U>
} }
impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U> impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
where T: AsyncRead + AsyncWrite + 'static, where T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
U: 'static, U: 'static,
A: 'static, A: 'static,

View File

@ -135,7 +135,7 @@ impl<H: HttpHandler + 'static> Worker<H> {
slf.shutdown_timeout(ctx, tx, d); slf.shutdown_timeout(ctx, tx, d);
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown http worker, {} connections", num);
slf.settings.head().traverse::<H>(); slf.settings.head().traverse::<TcpStream, H>();
let _ = tx.send(false); let _ = tx.send(false);
Arbiter::arbiter().send(StopArbiter(0)); Arbiter::arbiter().send(StopArbiter(0));
} }
@ -187,7 +187,7 @@ impl<H> Handler<StopWorker> for Worker<H>
Self::async_reply(rx.map_err(|_| ()).actfuture()) Self::async_reply(rx.map_err(|_| ()).actfuture())
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown http worker, {} connections", num);
self.settings.head().traverse::<H>(); self.settings.head().traverse::<TcpStream, H>();
Self::reply(false) Self::reply(false)
} }
} }