1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

add StreamConfiguration service

This commit is contained in:
Nikolay Kim 2018-10-01 22:23:02 -07:00
parent 7c78797d9b
commit c674ea9126
10 changed files with 160 additions and 30 deletions

View File

@ -1283,6 +1283,11 @@ impl IoStream for Connection {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
IoStream::set_linger(&mut *self.stream, dur) IoStream::set_linger(&mut *self.stream, dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
IoStream::set_keepalive(&mut *self.stream, dur)
}
} }
impl io::Read for Connection { impl io::Read for Connection {

View File

@ -13,7 +13,7 @@ use super::{h1, h2, HttpHandler, IoStream};
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> { enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> {
H1(h1::Http1<T, H>), H1(h1::Http1Dispatcher<T, H>),
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
Unknown(WorkerSettings<H>, Option<SocketAddr>, T, BytesMut), Unknown(WorkerSettings<H>, Option<SocketAddr>, T, BytesMut),
} }
@ -167,7 +167,7 @@ where
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() { if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() {
match kind { match kind {
ProtocolKind::Http1 => { ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new( self.proto = Some(HttpProtocol::H1(h1::Http1Dispatcher::new(
settings, settings,
io, io,
addr, addr,
@ -311,6 +311,10 @@ where
fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(()) Ok(())
} }
#[inline]
fn set_keepalive(&mut self, _: Option<time::Duration>) -> io::Result<()> {
Ok(())
}
} }
impl<T> io::Read for WrapperStream<T> impl<T> io::Read for WrapperStream<T>

View File

@ -24,15 +24,18 @@ const MAX_PIPELINED_MESSAGES: usize = 16;
bitflags! { bitflags! {
pub struct Flags: u8 { pub struct Flags: u8 {
const STARTED = 0b0000_0001; const STARTED = 0b0000_0001;
const KEEPALIVE_ENABLED = 0b0000_0010;
const KEEPALIVE = 0b0000_0100; const KEEPALIVE = 0b0000_0100;
const SHUTDOWN = 0b0000_1000; const SHUTDOWN = 0b0000_1000;
const READ_DISCONNECTED = 0b0001_0000; const READ_DISCONNECTED = 0b0001_0000;
const WRITE_DISCONNECTED = 0b0010_0000; const WRITE_DISCONNECTED = 0b0010_0000;
const POLLED = 0b0100_0000; const POLLED = 0b0100_0000;
} }
} }
pub(crate) struct Http1<T: IoStream, H: HttpHandler + 'static> { /// Dispatcher for HTTP/1.1 protocol
pub struct Http1Dispatcher<T: IoStream, H: HttpHandler + 'static> {
flags: Flags, flags: Flags,
settings: WorkerSettings<H>, settings: WorkerSettings<H>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
@ -42,7 +45,6 @@ pub(crate) struct Http1<T: IoStream, H: HttpHandler + 'static> {
buf: BytesMut, buf: BytesMut,
tasks: VecDeque<Entry<H>>, tasks: VecDeque<Entry<H>>,
error: Option<HttpDispatchError>, error: Option<HttpDispatchError>,
ka_enabled: bool,
ka_expire: Instant, ka_expire: Instant,
ka_timer: Option<Delay>, ka_timer: Option<Delay>,
} }
@ -79,7 +81,7 @@ impl<H: HttpHandler> Entry<H> {
} }
} }
impl<T, H> Http1<T, H> impl<T, H> Http1Dispatcher<T, H>
where where
T: IoStream, T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
@ -88,7 +90,6 @@ where
settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut, settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut,
is_eof: bool, keepalive_timer: Option<Delay>, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let ka_enabled = settings.keep_alive_enabled();
let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer {
(delay.deadline(), Some(delay)) (delay.deadline(), Some(delay))
} else if let Some(delay) = settings.keep_alive_timer() { } else if let Some(delay) = settings.keep_alive_timer() {
@ -97,12 +98,16 @@ where
(settings.now(), None) (settings.now(), None)
}; };
Http1 { let mut flags = if is_eof {
flags: if is_eof {
Flags::READ_DISCONNECTED Flags::READ_DISCONNECTED
} else if settings.keep_alive_enabled() {
Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED
} else { } else {
Flags::KEEPALIVE Flags::empty()
}, };
Http1Dispatcher {
flags,
stream: H1Writer::new(stream, settings.clone()), stream: H1Writer::new(stream, settings.clone()),
decoder: H1Decoder::new(), decoder: H1Decoder::new(),
payload: None, payload: None,
@ -113,7 +118,6 @@ where
settings, settings,
ka_timer, ka_timer,
ka_expire, ka_expire,
ka_enabled,
} }
} }
@ -212,7 +216,7 @@ where
} }
// no keep-alive // no keep-alive
if self.flags.contains(Flags::STARTED) if self.flags.contains(Flags::STARTED)
&& (!self.ka_enabled && (!self.flags.contains(Flags::KEEPALIVE_ENABLED)
|| !self.flags.contains(Flags::KEEPALIVE)) || !self.flags.contains(Flags::KEEPALIVE))
{ {
self.flags.insert(Flags::SHUTDOWN); self.flags.insert(Flags::SHUTDOWN);
@ -280,7 +284,7 @@ where
#[inline] #[inline]
/// read data from stream /// read data from stream
pub fn poll_io(&mut self) -> Result<(), HttpDispatchError> { pub(self) fn poll_io(&mut self) -> Result<(), HttpDispatchError> {
if !self.flags.contains(Flags::POLLED) { if !self.flags.contains(Flags::POLLED) {
self.parse()?; self.parse()?;
self.flags.insert(Flags::POLLED); self.flags.insert(Flags::POLLED);
@ -308,7 +312,7 @@ where
Ok(()) Ok(())
} }
pub fn poll_handler(&mut self) -> Poll<bool, HttpDispatchError> { pub(self) fn poll_handler(&mut self) -> Poll<bool, HttpDispatchError> {
let retry = self.can_read(); let retry = self.can_read();
// process first pipelined response, only one task can do io operation in http/1 // process first pipelined response, only one task can do io operation in http/1
@ -419,7 +423,7 @@ where
.push_back(Entry::Error(ServerError::err(Version::HTTP_11, status))); .push_back(Entry::Error(ServerError::err(Version::HTTP_11, status)));
} }
pub fn parse(&mut self) -> Result<(), HttpDispatchError> { pub(self) fn parse(&mut self) -> Result<(), HttpDispatchError> {
let mut updated = false; let mut updated = false;
'outer: loop { 'outer: loop {
@ -686,7 +690,8 @@ mod tests {
let readbuf = BytesMut::new(); let readbuf = BytesMut::new();
let settings = wrk_settings(); let settings = wrk_settings();
let mut h1 = Http1::new(settings.clone(), buf, None, readbuf, false, None); let mut h1 =
Http1Dispatcher::new(settings.clone(), buf, None, readbuf, false, None);
assert!(h1.poll_io().is_ok()); assert!(h1.poll_io().is_ok());
assert!(h1.poll_io().is_ok()); assert!(h1.poll_io().is_ok());
assert!(h1.flags.contains(Flags::READ_DISCONNECTED)); assert!(h1.flags.contains(Flags::READ_DISCONNECTED));

View File

@ -143,9 +143,11 @@ pub use self::message::Request;
pub use self::ssl::*; pub use self::ssl::*;
pub use self::error::{AcceptorError, HttpDispatchError}; pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::service::HttpService;
pub use self::settings::{ServerSettings, WorkerSettings, WorkerSettingsBuilder}; pub use self::settings::{ServerSettings, WorkerSettings, WorkerSettingsBuilder};
#[doc(hidden)]
pub use self::service::{HttpService, StreamConfiguration};
#[doc(hidden)] #[doc(hidden)]
pub use self::helpers::write_content_length; pub use self::helpers::write_content_length;
@ -268,6 +270,8 @@ pub trait IoStream: AsyncRead + AsyncWrite + 'static {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>; fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()>;
fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> { fn read_available(&mut self, buf: &mut BytesMut) -> Poll<(bool, bool), io::Error> {
let mut read_some = false; let mut read_some = false;
loop { loop {
@ -324,6 +328,11 @@ impl IoStream for ::tokio_uds::UnixStream {
fn set_linger(&mut self, _dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, _dur: Option<time::Duration>) -> io::Result<()> {
Ok(()) Ok(())
} }
#[inline]
fn set_keepalive(&mut self, _nodelay: bool) -> io::Result<()> {
Ok(())
}
} }
impl IoStream for TcpStream { impl IoStream for TcpStream {
@ -341,4 +350,9 @@ impl IoStream for TcpStream {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
TcpStream::set_linger(self, dur) TcpStream::set_linger(self, dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
TcpStream::set_keepalive(self, dur)
}
} }

View File

@ -1,4 +1,5 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Duration;
use actix_net::service::{NewService, Service}; use actix_net::service::{NewService, Service};
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
@ -10,6 +11,7 @@ use super::handler::HttpHandler;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::IoStream; use super::IoStream;
/// `NewService` implementation for HTTP1/HTTP2 transports
pub struct HttpService<H, Io> pub struct HttpService<H, Io>
where where
H: HttpHandler, H: HttpHandler,
@ -56,7 +58,6 @@ where
Io: IoStream, Io: IoStream,
{ {
settings: WorkerSettings<H>, settings: WorkerSettings<H>,
// tcp_ka: Option<Duration>,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -66,12 +67,6 @@ where
Io: IoStream, Io: IoStream,
{ {
fn new(settings: WorkerSettings<H>) -> HttpServiceHandler<H, Io> { fn new(settings: WorkerSettings<H>) -> HttpServiceHandler<H, Io> {
// let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
// Some(Duration::new(val as u64, 0))
// } else {
// None
// };
HttpServiceHandler { HttpServiceHandler {
settings, settings,
_t: PhantomData, _t: PhantomData,
@ -94,7 +89,89 @@ where
} }
fn call(&mut self, mut req: Self::Request) -> Self::Future { fn call(&mut self, mut req: Self::Request) -> Self::Future {
let _ = req.set_nodelay(true);
HttpChannel::new(self.settings.clone(), req, None) HttpChannel::new(self.settings.clone(), req, None)
} }
} }
/// `NewService` implementation for stream configuration service
pub struct StreamConfiguration<T, E> {
no_delay: Option<bool>,
tcp_ka: Option<Option<Duration>>,
_t: PhantomData<(T, E)>,
}
impl<T, E> StreamConfiguration<T, E> {
/// Create new `StreamConfigurationService` instance.
pub fn new() -> Self {
Self {
no_delay: None,
tcp_ka: None,
_t: PhantomData,
}
}
/// Sets the value of the `TCP_NODELAY` option on this socket.
pub fn nodelay(mut self, nodelay: bool) -> Self {
self.no_delay = Some(nodelay);
self
}
/// Sets whether keepalive messages are enabled to be sent on this socket.
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
self.tcp_ka = Some(keepalive);
self
}
}
impl<T: IoStream, E> NewService for StreamConfiguration<T, E> {
type Request = T;
type Response = T;
type Error = E;
type InitError = ();
type Service = StreamConfigurationService<T, E>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(StreamConfigurationService {
no_delay: self.no_delay.clone(),
tcp_ka: self.tcp_ka.clone(),
_t: PhantomData,
})
}
}
/// Stream configuration service
pub struct StreamConfigurationService<T, E> {
no_delay: Option<bool>,
tcp_ka: Option<Option<Duration>>,
_t: PhantomData<(T, E)>,
}
impl<T, E> Service for StreamConfigurationService<T, E>
where
T: IoStream,
{
type Request = T;
type Response = T;
type Error = E;
type Future = FutureResult<T, E>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
if let Some(no_delay) = self.no_delay {
if req.set_nodelay(no_delay).is_err() {
error!("Can not set socket no-delay option");
}
}
if let Some(keepalive) = self.tcp_ka {
if req.set_keepalive(keepalive).is_err() {
error!("Can not set socket keep-alive option");
}
}
ok(req)
}
}

View File

@ -21,4 +21,9 @@ impl<Io: IoStream> IoStream for TlsStream<Io> {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur) self.get_mut().get_mut().set_linger(dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_keepalive(dur)
}
} }

View File

@ -74,4 +74,9 @@ impl<T: IoStream> IoStream for SslStream<T> {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_linger(dur) self.get_mut().get_mut().set_linger(dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().get_mut().set_keepalive(dur)
}
} }

View File

@ -51,6 +51,11 @@ impl<Io: IoStream> IoStream for TlsStream<Io, ClientSession> {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().0.set_linger(dur) self.get_mut().0.set_linger(dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().0.set_keepalive(dur)
}
} }
impl<Io: IoStream> IoStream for TlsStream<Io, ServerSession> { impl<Io: IoStream> IoStream for TlsStream<Io, ServerSession> {
@ -69,4 +74,9 @@ impl<Io: IoStream> IoStream for TlsStream<Io, ServerSession> {
fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> { fn set_linger(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().0.set_linger(dur) self.get_mut().0.set_linger(dur)
} }
#[inline]
fn set_keepalive(&mut self, dur: Option<time::Duration>) -> io::Result<()> {
self.get_mut().0.set_keepalive(dur)
}
} }

View File

@ -1016,7 +1016,10 @@ fn test_server_cookies() {
#[test] #[test]
fn test_custom_pipeline() { fn test_custom_pipeline() {
use actix::System; use actix::System;
use actix_web::server::{HttpService, KeepAlive, WorkerSettings}; use actix_net::service::NewServiceExt;
use actix_web::server::{
HttpService, KeepAlive, StreamConfiguration, WorkerSettings,
};
let addr = test::TestServer::unused_addr(); let addr = test::TestServer::unused_addr();
@ -1034,7 +1037,9 @@ fn test_custom_pipeline() {
.server_address(addr) .server_address(addr)
.finish(); .finish();
HttpService::new(settings) StreamConfiguration::new()
.nodelay(true)
.and_then(HttpService::new(settings))
}).unwrap() }).unwrap()
.run(); .run();
}); });

View File

@ -7,7 +7,7 @@ extern crate rand;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{thread, time}; use std::thread;
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::Stream;