1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

refactor http channels list; rename WorkerSettings

This commit is contained in:
Nikolay Kim 2018-10-02 15:25:32 -07:00
parent d7379bd10b
commit ae5c4dfb78
14 changed files with 157 additions and 150 deletions

View File

@ -293,6 +293,7 @@ impl Default for ClientConnector {
} }
}; };
#[cfg_attr(feature = "cargo-clippy", allow(clippy::let_unit_value))]
ClientConnector::with_connector_impl(connector) ClientConnector::with_connector_impl(connector)
} }
} }

View File

@ -9,9 +9,10 @@ use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use tokio_timer::{sleep, Delay}; use tokio_timer::{sleep, Delay};
use super::channel::HttpProtocol;
use super::error::AcceptorError; use super::error::AcceptorError;
use super::handler::HttpHandler; use super::handler::HttpHandler;
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::IoStream; use super::IoStream;
/// This trait indicates types that can create acceptor service for http server. /// This trait indicates types that can create acceptor service for http server.
@ -271,7 +272,7 @@ impl<T: Service> Future for AcceptorTimeoutResponse<T> {
pub(crate) struct ServerMessageAcceptor<T, H: HttpHandler> { pub(crate) struct ServerMessageAcceptor<T, H: HttpHandler> {
inner: T, inner: T,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<T, H> ServerMessageAcceptor<T, H> impl<T, H> ServerMessageAcceptor<T, H>
@ -279,7 +280,7 @@ where
H: HttpHandler, H: HttpHandler,
T: NewService<Request = net::TcpStream>, T: NewService<Request = net::TcpStream>,
{ {
pub(crate) fn new(settings: WorkerSettings<H>, inner: T) -> Self { pub(crate) fn new(settings: ServiceConfig<H>, inner: T) -> Self {
ServerMessageAcceptor { inner, settings } ServerMessageAcceptor { inner, settings }
} }
} }
@ -310,7 +311,7 @@ where
T: NewService<Request = net::TcpStream>, T: NewService<Request = net::TcpStream>,
{ {
fut: T::Future, fut: T::Future,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<T, H> Future for ServerMessageAcceptorResponse<T, H> impl<T, H> Future for ServerMessageAcceptorResponse<T, H>
@ -334,7 +335,7 @@ where
pub(crate) struct ServerMessageAcceptorService<T, H: HttpHandler> { pub(crate) struct ServerMessageAcceptorService<T, H: HttpHandler> {
inner: T, inner: T,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<T, H> Service for ServerMessageAcceptorService<T, H> impl<T, H> Service for ServerMessageAcceptorService<T, H>
@ -359,9 +360,11 @@ where
fut: self.inner.call(stream), fut: self.inner.call(stream),
}) })
} }
ServerMessage::Shutdown(timeout) => Either::B(ok(())), ServerMessage::Shutdown(_) => Either::B(ok(())),
ServerMessage::ForceShutdown => { ServerMessage::ForceShutdown => {
// self.settings.head().traverse::<TcpStream, H>(); self.settings
.head()
.traverse(|proto: &mut HttpProtocol<TcpStream, H>| proto.shutdown());
Either::B(ok(())) Either::B(ok(()))
} }
} }

View File

@ -10,7 +10,7 @@ use super::acceptor::{
use super::error::AcceptorError; use super::error::AcceptorError;
use super::handler::IntoHttpHandler; use super::handler::IntoHttpHandler;
use super::service::HttpService; use super::service::HttpService;
use super::settings::{ServerSettings, WorkerSettings}; use super::settings::{ServerSettings, ServiceConfig};
use super::KeepAlive; use super::KeepAlive;
pub(crate) trait ServiceProvider { pub(crate) trait ServiceProvider {
@ -50,7 +50,7 @@ where
let acceptor = self.acceptor.clone(); let acceptor = self.acceptor.clone();
move || { move || {
let app = (factory)().into_handler(); let app = (factory)().into_handler();
let settings = WorkerSettings::new( let settings = ServiceConfig::new(
app, app,
keep_alive, keep_alive,
client_timeout, client_timeout,

View File

@ -1,5 +1,5 @@
use std::net::{Shutdown, SocketAddr}; use std::net::{Shutdown, SocketAddr};
use std::{io, ptr, time}; use std::{io, mem, time};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
@ -7,16 +7,35 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay; use tokio_timer::Delay;
use super::error::HttpDispatchError; use super::error::HttpDispatchError;
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::{h1, h2, HttpHandler, IoStream}; use super::{h1, h2, HttpHandler, IoStream};
use http::StatusCode; use http::StatusCode;
const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> { pub(crate) enum HttpProtocol<T: IoStream, H: HttpHandler + 'static> {
H1(h1::Http1Dispatcher<T, H>), H1(h1::Http1Dispatcher<T, H>),
H2(h2::Http2<T, H>), H2(h2::Http2<T, H>),
Unknown(WorkerSettings<H>, Option<SocketAddr>, T, BytesMut), Unknown(ServiceConfig<H>, Option<SocketAddr>, T, BytesMut),
None,
}
impl<T: IoStream, H: HttpHandler + 'static> HttpProtocol<T, H> {
pub(crate) fn shutdown(&mut self) {
match self {
HttpProtocol::H1(ref mut h1) => {
let io = h1.io();
let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = IoStream::shutdown(io, Shutdown::Both);
}
HttpProtocol::H2(ref mut h2) => h2.shutdown(),
HttpProtocol::Unknown(_, _, io, _) => {
let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = IoStream::shutdown(io, Shutdown::Both);
}
HttpProtocol::None => (),
}
}
} }
enum ProtocolKind { enum ProtocolKind {
@ -30,8 +49,8 @@ where
T: IoStream, T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
proto: Option<HttpProtocol<T, H>>, node: Node<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>, node_reg: bool,
ka_timeout: Option<Delay>, ka_timeout: Option<Delay>,
} }
@ -41,12 +60,14 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub(crate) fn new( pub(crate) fn new(
settings: WorkerSettings<H>, io: T, peer: Option<SocketAddr>, settings: ServiceConfig<H>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let ka_timeout = settings.client_timer();
HttpChannel { HttpChannel {
node: None, ka_timeout,
ka_timeout: settings.client_timer(), node_reg: false,
proto: Some(HttpProtocol::Unknown( node: Node::new(HttpProtocol::Unknown(
settings, settings,
peer, peer,
io, io,
@ -54,18 +75,6 @@ where
)), )),
} }
} }
pub(crate) fn shutdown(&mut self) {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
let io = h1.io();
let _ = IoStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = IoStream::shutdown(io, Shutdown::Both);
}
Some(HttpProtocol::H2(ref mut h2)) => h2.shutdown(),
_ => (),
}
}
} }
impl<T, H> Drop for HttpChannel<T, H> impl<T, H> Drop for HttpChannel<T, H>
@ -74,9 +83,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(mut node) = self.node.take() { self.node.remove();
node.remove()
}
} }
} }
@ -94,17 +101,16 @@ where
match self.ka_timeout.as_mut().unwrap().poll() { match self.ka_timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection"); trace!("Slow request timed out, close connection");
if let Some(HttpProtocol::Unknown(settings, _, io, buf)) = let proto = mem::replace(self.node.get_mut(), HttpProtocol::None);
self.proto.take() if let HttpProtocol::Unknown(settings, _, io, buf) = proto {
{ *self.node.get_mut() =
self.proto = HttpProtocol::H1(h1::Http1Dispatcher::for_error(
Some(HttpProtocol::H1(h1::Http1Dispatcher::for_error(
settings, settings,
io, io,
StatusCode::REQUEST_TIMEOUT, StatusCode::REQUEST_TIMEOUT,
self.ka_timeout.take(), self.ka_timeout.take(),
buf, buf,
))); ));
return self.poll(); return self.poll();
} }
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
@ -114,28 +120,22 @@ where
} }
} }
if self.node.is_none() { if !self.node_reg {
let el = self as *mut _; self.node_reg = true;
self.node = Some(Node::new(el)); let settings = match self.node.get_mut() {
let _ = match self.proto { HttpProtocol::H1(ref mut h1) => h1.settings().clone(),
Some(HttpProtocol::H1(ref mut h1)) => { HttpProtocol::H2(ref mut h2) => h2.settings().clone(),
self.node.as_mut().map(|n| h1.settings().head().insert(n)) HttpProtocol::Unknown(ref mut settings, _, _, _) => settings.clone(),
} HttpProtocol::None => unreachable!(),
Some(HttpProtocol::H2(ref mut h2)) => {
self.node.as_mut().map(|n| h2.settings().head().insert(n))
}
Some(HttpProtocol::Unknown(ref mut settings, _, _, _)) => {
self.node.as_mut().map(|n| settings.head().insert(n))
}
None => unreachable!(),
}; };
settings.head().insert(&mut self.node);
} }
let mut is_eof = false; let mut is_eof = false;
let kind = match self.proto { let kind = match self.node.get_mut() {
Some(HttpProtocol::H1(ref mut h1)) => return h1.poll(), HttpProtocol::H1(ref mut h1) => return h1.poll(),
Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(), HttpProtocol::H2(ref mut h2) => return h2.poll(),
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => { HttpProtocol::Unknown(_, _, ref mut io, ref mut buf) => {
let mut err = None; let mut err = None;
let mut disconnect = false; let mut disconnect = false;
match io.read_available(buf) { match io.read_available(buf) {
@ -168,31 +168,32 @@ where
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
} }
None => unreachable!(), HttpProtocol::None => unreachable!(),
}; };
// upgrade to specific http protocol // upgrade to specific http protocol
if let Some(HttpProtocol::Unknown(settings, addr, io, buf)) = self.proto.take() { let proto = mem::replace(self.node.get_mut(), HttpProtocol::None);
if let HttpProtocol::Unknown(settings, addr, io, buf) = proto {
match kind { match kind {
ProtocolKind::Http1 => { ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1Dispatcher::new( *self.node.get_mut() = HttpProtocol::H1(h1::Http1Dispatcher::new(
settings, settings,
io, io,
addr, addr,
buf, buf,
is_eof, is_eof,
self.ka_timeout.take(), self.ka_timeout.take(),
))); ));
return self.poll(); return self.poll();
} }
ProtocolKind::Http2 => { ProtocolKind::Http2 => {
self.proto = Some(HttpProtocol::H2(h2::Http2::new( *self.node.get_mut() = HttpProtocol::H2(h2::Http2::new(
settings, settings,
io, io,
addr, addr,
buf.freeze(), buf.freeze(),
self.ka_timeout.take(), self.ka_timeout.take(),
))); ));
return self.poll(); return self.poll();
} }
} }
@ -204,18 +205,22 @@ where
pub(crate) struct Node<T> { pub(crate) struct Node<T> {
next: Option<*mut Node<T>>, next: Option<*mut Node<T>>,
prev: Option<*mut Node<T>>, prev: Option<*mut Node<T>>,
element: *mut T, element: T,
} }
impl<T> Node<T> { impl<T> Node<T> {
fn new(el: *mut T) -> Self { fn new(element: T) -> Self {
Node { Node {
element,
next: None, next: None,
prev: None, prev: None,
element: el,
} }
} }
fn get_mut(&mut self) -> &mut T {
&mut self.element
}
fn insert<I>(&mut self, next_el: &mut Node<I>) { fn insert<I>(&mut self, next_el: &mut Node<I>) {
let next: *mut Node<T> = next_el as *const _ as *mut _; let next: *mut Node<T> = next_el as *const _ as *mut _;
@ -235,7 +240,6 @@ impl<T> Node<T> {
} }
fn remove(&mut self) { fn remove(&mut self) {
self.element = ptr::null_mut();
let next = self.next.take(); let next = self.next.take();
let prev = self.prev.take(); let prev = self.prev.take();
@ -257,30 +261,28 @@ impl Node<()> {
Node { Node {
next: None, next: None,
prev: None, prev: None,
element: ptr::null_mut(), element: (),
} }
} }
pub(crate) fn traverse<T, H, F: Fn(&mut HttpChannel<T, H>)>(&self, f: F) pub(crate) fn traverse<T, H, F: Fn(&mut HttpProtocol<T, H>)>(&self, f: F)
where where
T: IoStream, T: IoStream,
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
let mut next = self.next.as_ref(); if let Some(n) = self.next.as_ref() {
loop { unsafe {
if let Some(n) = next { let mut next: &mut Node<HttpProtocol<T, H>> =
unsafe { &mut *(n.as_ref().unwrap() as *const _ as *mut _);
let n: &Node<()> = &*(n.as_ref().unwrap() as *const _); loop {
next = n.next.as_ref(); f(&mut next.element);
if !n.element.is_null() { next = if let Some(n) = next.next.as_ref() {
let ch: &mut HttpChannel<T, H> = &mut **n
&mut *(&mut *(n.element as *mut _) as *mut () as *mut _); } else {
f(ch); return;
} }
} }
} else {
return;
} }
} }
} }

View File

@ -16,7 +16,7 @@ use super::h1decoder::{DecoderError, H1Decoder, Message};
use super::h1writer::H1Writer; use super::h1writer::H1Writer;
use super::handler::{HttpHandler, HttpHandlerTask, HttpHandlerTaskFut}; use super::handler::{HttpHandler, HttpHandlerTask, HttpHandlerTaskFut};
use super::input::PayloadType; use super::input::PayloadType;
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::{IoStream, Writer}; use super::{IoStream, Writer};
const MAX_PIPELINED_MESSAGES: usize = 16; const MAX_PIPELINED_MESSAGES: usize = 16;
@ -37,7 +37,7 @@ bitflags! {
/// Dispatcher for HTTP/1.1 protocol /// Dispatcher for HTTP/1.1 protocol
pub struct Http1Dispatcher<T: IoStream, H: HttpHandler + 'static> { pub struct Http1Dispatcher<T: IoStream, H: HttpHandler + 'static> {
flags: Flags, flags: Flags,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
stream: H1Writer<T, H>, stream: H1Writer<T, H>,
decoder: H1Decoder, decoder: H1Decoder,
@ -87,7 +87,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub fn new( pub fn new(
settings: WorkerSettings<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut, settings: ServiceConfig<H>, stream: T, addr: Option<SocketAddr>, buf: BytesMut,
is_eof: bool, keepalive_timer: Option<Delay>, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer { let (ka_expire, ka_timer) = if let Some(delay) = keepalive_timer {
@ -122,7 +122,7 @@ where
} }
pub(crate) fn for_error( pub(crate) fn for_error(
settings: WorkerSettings<H>, stream: T, status: StatusCode, settings: ServiceConfig<H>, stream: T, status: StatusCode,
mut keepalive_timer: Option<Delay>, buf: BytesMut, mut keepalive_timer: Option<Delay>, buf: BytesMut,
) -> Self { ) -> Self {
if let Some(deadline) = settings.client_timer_expire() { if let Some(deadline) = settings.client_timer_expire() {
@ -147,7 +147,7 @@ where
} }
#[inline] #[inline]
pub fn settings(&self) -> &WorkerSettings<H> { pub fn settings(&self) -> &ServiceConfig<H> {
&self.settings &self.settings
} }
@ -259,7 +259,7 @@ where
Err(err) => { Err(err) => {
debug!("Error sending data: {}", err); debug!("Error sending data: {}", err);
self.client_disconnected(false); self.client_disconnected(false);
return Err(err.into()); Err(err.into())
} }
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
// if payload is not consumed we can not use connection // if payload is not consumed we can not use connection
@ -347,10 +347,8 @@ where
if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES { if self.can_read() && self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.stream.get_mut().read_available(&mut self.buf) { match self.stream.get_mut().read_available(&mut self.buf) {
Ok(Async::Ready((read_some, disconnected))) => { Ok(Async::Ready((read_some, disconnected))) => {
if read_some { if read_some && self.parse()? {
if self.parse()? { updated = true;
updated = true;
}
} }
if disconnected { if disconnected {
self.client_disconnected(true); self.client_disconnected(true);
@ -397,11 +395,9 @@ where
// if read-backpressure is enabled and we consumed some data. // if read-backpressure is enabled and we consumed some data.
// we may read more dataand retry // we may read more dataand retry
if !retry && self.can_read() { if !retry && self.can_read() && self.poll_io()? {
if self.poll_io()? { retry = self.can_read();
retry = self.can_read(); continue;
continue;
}
} }
break; break;
} }
@ -597,11 +593,11 @@ mod tests {
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use server::h1decoder::Message; use server::h1decoder::Message;
use server::handler::IntoHttpHandler; use server::handler::IntoHttpHandler;
use server::settings::{ServerSettings, WorkerSettings}; use server::settings::{ServerSettings, ServiceConfig};
use server::{KeepAlive, Request}; use server::{KeepAlive, Request};
fn wrk_settings() -> WorkerSettings<HttpApplication> { fn wrk_settings() -> ServiceConfig<HttpApplication> {
WorkerSettings::<HttpApplication>::new( ServiceConfig::<HttpApplication>::new(
App::new().into_handler(), App::new().into_handler(),
KeepAlive::Os, KeepAlive::Os,
5000, 5000,

View File

@ -5,7 +5,7 @@ use futures::{Async, Poll};
use httparse; use httparse;
use super::message::{MessageFlags, Request}; use super::message::{MessageFlags, Request};
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use error::ParseError; use error::ParseError;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use http::{header, HttpTryFrom, Method, Uri, Version}; use http::{header, HttpTryFrom, Method, Uri, Version};
@ -43,7 +43,7 @@ impl H1Decoder {
} }
pub fn decode<H>( pub fn decode<H>(
&mut self, src: &mut BytesMut, settings: &WorkerSettings<H>, &mut self, src: &mut BytesMut, settings: &ServiceConfig<H>,
) -> Result<Option<Message>, DecoderError> { ) -> Result<Option<Message>, DecoderError> {
// read payload // read payload
if self.decoder.is_some() { if self.decoder.is_some() {
@ -80,7 +80,7 @@ impl H1Decoder {
} }
fn parse_message<H>( fn parse_message<H>(
&self, buf: &mut BytesMut, settings: &WorkerSettings<H>, &self, buf: &mut BytesMut, settings: &ServiceConfig<H>,
) -> Poll<(Request, Option<EncodingDecoder>), ParseError> { ) -> Poll<(Request, Option<EncodingDecoder>), ParseError> {
// Parse http message // Parse http message
let mut has_upgrade = false; let mut has_upgrade = false;

View File

@ -8,7 +8,7 @@ use tokio_io::AsyncWrite;
use super::helpers; use super::helpers;
use super::output::{Output, ResponseInfo, ResponseLength}; use super::output::{Output, ResponseInfo, ResponseLength};
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::Request; use super::Request;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};
@ -37,11 +37,11 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
headers_size: u32, headers_size: u32,
buffer: Output, buffer: Output,
buffer_capacity: usize, buffer_capacity: usize,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> { impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new(stream: T, settings: WorkerSettings<H>) -> H1Writer<T, H> { pub fn new(stream: T, settings: ServiceConfig<H>) -> H1Writer<T, H> {
H1Writer { H1Writer {
flags: Flags::KEEPALIVE, flags: Flags::KEEPALIVE,
written: 0, written: 0,

View File

@ -22,7 +22,7 @@ use uri::Url;
use super::error::{HttpDispatchError, ServerError}; use super::error::{HttpDispatchError, ServerError};
use super::h2writer::H2Writer; use super::h2writer::H2Writer;
use super::input::PayloadType; use super::input::PayloadType;
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::{HttpHandler, HttpHandlerTask, IoStream, Writer}; use super::{HttpHandler, HttpHandlerTask, IoStream, Writer};
bitflags! { bitflags! {
@ -38,7 +38,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
flags: Flags, flags: Flags,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
addr: Option<SocketAddr>, addr: Option<SocketAddr>,
state: State<IoWrapper<T>>, state: State<IoWrapper<T>>,
tasks: VecDeque<Entry<H>>, tasks: VecDeque<Entry<H>>,
@ -58,7 +58,7 @@ where
H: HttpHandler + 'static, H: HttpHandler + 'static,
{ {
pub fn new( pub fn new(
settings: WorkerSettings<H>, io: T, addr: Option<SocketAddr>, buf: Bytes, settings: ServiceConfig<H>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let extensions = io.extensions(); let extensions = io.extensions();
@ -82,7 +82,7 @@ where
self.keepalive_timer.take(); self.keepalive_timer.take();
} }
pub fn settings(&self) -> &WorkerSettings<H> { pub fn settings(&self) -> &ServiceConfig<H> {
&self.settings &self.settings
} }
@ -338,7 +338,7 @@ struct Entry<H: HttpHandler + 'static> {
impl<H: HttpHandler + 'static> Entry<H> { impl<H: HttpHandler + 'static> Entry<H> {
fn new( fn new(
parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>, parts: Parts, recv: RecvStream, resp: SendResponse<Bytes>,
addr: Option<SocketAddr>, settings: WorkerSettings<H>, addr: Option<SocketAddr>, settings: ServiceConfig<H>,
extensions: Option<Rc<Extensions>>, extensions: Option<Rc<Extensions>>,
) -> Entry<H> ) -> Entry<H>
where where

View File

@ -14,7 +14,7 @@ use modhttp::Response;
use super::helpers; use super::helpers;
use super::message::Request; use super::message::Request;
use super::output::{Output, ResponseInfo, ResponseLength}; use super::output::{Output, ResponseInfo, ResponseLength};
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};
use header::ContentEncoding; use header::ContentEncoding;
@ -42,13 +42,11 @@ pub(crate) struct H2Writer<H: 'static> {
written: u64, written: u64,
buffer: Output, buffer: Output,
buffer_capacity: usize, buffer_capacity: usize,
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<H: 'static> H2Writer<H> { impl<H: 'static> H2Writer<H> {
pub fn new( pub fn new(respond: SendResponse<Bytes>, settings: ServiceConfig<H>) -> H2Writer<H> {
respond: SendResponse<Bytes>, settings: WorkerSettings<H>,
) -> H2Writer<H> {
H2Writer { H2Writer {
stream: None, stream: None,
flags: Flags::empty(), flags: Flags::empty(),

View File

@ -8,7 +8,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use super::channel::{HttpChannel, WrapperStream}; use super::channel::{HttpChannel, WrapperStream};
use super::handler::{HttpHandler, IntoHttpHandler}; use super::handler::{HttpHandler, IntoHttpHandler};
use super::http::HttpServer; use super::http::HttpServer;
use super::settings::{ServerSettings, WorkerSettings}; use super::settings::{ServerSettings, ServiceConfig};
impl<T: AsyncRead + AsyncWrite + 'static> Message for WrapperStream<T> { impl<T: AsyncRead + AsyncWrite + 'static> Message for WrapperStream<T> {
type Result = (); type Result = ();
@ -32,7 +32,7 @@ where
// set server settings // set server settings
let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
let apps = (self.factory)().into_handler(); let apps = (self.factory)().into_handler();
let settings = WorkerSettings::new( let settings = ServiceConfig::new(
apps, apps,
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_timeout,
@ -49,7 +49,7 @@ where
} }
struct HttpIncoming<H: HttpHandler> { struct HttpIncoming<H: HttpHandler> {
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
} }
impl<H: HttpHandler> Actor for HttpIncoming<H> { impl<H: HttpHandler> Actor for HttpIncoming<H> {

View File

@ -143,7 +143,7 @@ pub use self::message::Request;
pub use self::ssl::*; pub use self::ssl::*;
pub use self::error::{AcceptorError, HttpDispatchError}; pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::settings::{ServerSettings, WorkerSettings, WorkerSettingsBuilder}; pub use self::settings::{ServerSettings, ServiceConfig, ServiceConfigBuilder};
#[doc(hidden)] #[doc(hidden)]
pub use self::service::{HttpService, StreamConfiguration}; pub use self::service::{HttpService, StreamConfiguration};

View File

@ -8,7 +8,7 @@ use futures::{Async, Poll};
use super::channel::HttpChannel; use super::channel::HttpChannel;
use super::error::HttpDispatchError; use super::error::HttpDispatchError;
use super::handler::HttpHandler; use super::handler::HttpHandler;
use super::settings::WorkerSettings; use super::settings::ServiceConfig;
use super::IoStream; use super::IoStream;
/// `NewService` implementation for HTTP1/HTTP2 transports /// `NewService` implementation for HTTP1/HTTP2 transports
@ -17,7 +17,7 @@ where
H: HttpHandler, H: HttpHandler,
Io: IoStream, Io: IoStream,
{ {
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -27,7 +27,7 @@ where
Io: IoStream, Io: IoStream,
{ {
/// Create new `HttpService` instance. /// Create new `HttpService` instance.
pub fn new(settings: WorkerSettings<H>) -> Self { pub fn new(settings: ServiceConfig<H>) -> Self {
HttpService { HttpService {
settings, settings,
_t: PhantomData, _t: PhantomData,
@ -57,7 +57,7 @@ where
H: HttpHandler, H: HttpHandler,
Io: IoStream, Io: IoStream,
{ {
settings: WorkerSettings<H>, settings: ServiceConfig<H>,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -66,7 +66,7 @@ where
H: HttpHandler, H: HttpHandler,
Io: IoStream, Io: IoStream,
{ {
fn new(settings: WorkerSettings<H>) -> HttpServiceHandler<H, Io> { fn new(settings: ServiceConfig<H>) -> HttpServiceHandler<H, Io> {
HttpServiceHandler { HttpServiceHandler {
settings, settings,
_t: PhantomData, _t: PhantomData,
@ -103,6 +103,12 @@ pub struct StreamConfiguration<T, E> {
_t: PhantomData<(T, E)>, _t: PhantomData<(T, E)>,
} }
impl<T, E> Default for StreamConfiguration<T, E> {
fn default() -> Self {
Self::new()
}
}
impl<T, E> StreamConfiguration<T, E> { impl<T, E> StreamConfiguration<T, E> {
/// Create new `StreamConfigurationService` instance. /// Create new `StreamConfigurationService` instance.
pub fn new() -> Self { pub fn new() -> Self {
@ -136,8 +142,8 @@ impl<T: IoStream, E> NewService for StreamConfiguration<T, E> {
fn new_service(&self) -> Self::Future { fn new_service(&self) -> Self::Future {
ok(StreamConfigurationService { ok(StreamConfigurationService {
no_delay: self.no_delay.clone(), no_delay: self.no_delay,
tcp_ka: self.tcp_ka.clone(), tcp_ka: self.tcp_ka,
_t: PhantomData, _t: PhantomData,
}) })
} }

View File

@ -127,7 +127,8 @@ impl ServerSettings {
// "Sun, 06 Nov 1994 08:49:37 GMT".len() // "Sun, 06 Nov 1994 08:49:37 GMT".len()
const DATE_VALUE_LENGTH: usize = 29; const DATE_VALUE_LENGTH: usize = 29;
pub struct WorkerSettings<H>(Rc<Inner<H>>); /// Http service configuration
pub struct ServiceConfig<H>(Rc<Inner<H>>);
struct Inner<H> { struct Inner<H> {
handler: H, handler: H,
@ -141,18 +142,18 @@ struct Inner<H> {
date: UnsafeCell<(bool, Date)>, date: UnsafeCell<(bool, Date)>,
} }
impl<H> Clone for WorkerSettings<H> { impl<H> Clone for ServiceConfig<H> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
WorkerSettings(self.0.clone()) ServiceConfig(self.0.clone())
} }
} }
impl<H> WorkerSettings<H> { impl<H> ServiceConfig<H> {
/// Create instance of `WorkerSettings` /// Create instance of `ServiceConfig`
pub(crate) fn new( pub(crate) fn new(
handler: H, keep_alive: KeepAlive, client_timeout: u64, client_shutdown: u64, handler: H, keep_alive: KeepAlive, client_timeout: u64, client_shutdown: u64,
settings: ServerSettings, settings: ServerSettings,
) -> WorkerSettings<H> { ) -> ServiceConfig<H> {
let (keep_alive, ka_enabled) = match keep_alive { let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true), KeepAlive::Timeout(val) => (val as u64, true),
KeepAlive::Os | KeepAlive::Tcp(_) => (0, true), KeepAlive::Os | KeepAlive::Tcp(_) => (0, true),
@ -164,7 +165,7 @@ impl<H> WorkerSettings<H> {
None None
}; };
WorkerSettings(Rc::new(Inner { ServiceConfig(Rc::new(Inner {
handler, handler,
keep_alive, keep_alive,
ka_enabled, ka_enabled,
@ -178,8 +179,8 @@ impl<H> WorkerSettings<H> {
} }
/// Create worker settings builder. /// Create worker settings builder.
pub fn build(handler: H) -> WorkerSettingsBuilder<H> { pub fn build(handler: H) -> ServiceConfigBuilder<H> {
WorkerSettingsBuilder::new(handler) ServiceConfigBuilder::new(handler)
} }
pub(crate) fn head(&self) -> RefMut<Node<()>> { pub(crate) fn head(&self) -> RefMut<Node<()>> {
@ -220,7 +221,7 @@ impl<H> WorkerSettings<H> {
} }
} }
impl<H: 'static> WorkerSettings<H> { impl<H: 'static> ServiceConfig<H> {
#[inline] #[inline]
/// Client timeout for first request. /// Client timeout for first request.
pub fn client_timer(&self) -> Option<Delay> { pub fn client_timer(&self) -> Option<Delay> {
@ -319,11 +320,11 @@ impl<H: 'static> WorkerSettings<H> {
} }
} }
/// An worker settings builder /// A service config builder
/// ///
/// This type can be used to construct an instance of `WorkerSettings` through a /// This type can be used to construct an instance of `ServiceConfig` through a
/// builder-like pattern. /// builder-like pattern.
pub struct WorkerSettingsBuilder<H> { pub struct ServiceConfigBuilder<H> {
handler: H, handler: H,
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_timeout: u64, client_timeout: u64,
@ -333,10 +334,10 @@ pub struct WorkerSettingsBuilder<H> {
secure: bool, secure: bool,
} }
impl<H> WorkerSettingsBuilder<H> { impl<H> ServiceConfigBuilder<H> {
/// Create instance of `WorkerSettingsBuilder` /// Create instance of `ServiceConfigBuilder`
pub fn new(handler: H) -> WorkerSettingsBuilder<H> { pub fn new(handler: H) -> ServiceConfigBuilder<H> {
WorkerSettingsBuilder { ServiceConfigBuilder {
handler, handler,
keep_alive: KeepAlive::Timeout(5), keep_alive: KeepAlive::Timeout(5),
client_timeout: 5000, client_timeout: 5000,
@ -419,12 +420,12 @@ impl<H> WorkerSettingsBuilder<H> {
self self
} }
/// Finish worker settings configuration and create `WorkerSettings` object. /// Finish service configuration and create `ServiceConfig` object.
pub fn finish(self) -> WorkerSettings<H> { pub fn finish(self) -> ServiceConfig<H> {
let settings = ServerSettings::new(self.addr, &self.host, self.secure); let settings = ServerSettings::new(self.addr, &self.host, self.secure);
let client_shutdown = if self.secure { self.client_shutdown } else { 0 }; let client_shutdown = if self.secure { self.client_shutdown } else { 0 };
WorkerSettings::new( ServiceConfig::new(
self.handler, self.handler,
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_timeout,
@ -507,7 +508,7 @@ mod tests {
let mut rt = current_thread::Runtime::new().unwrap(); let mut rt = current_thread::Runtime::new().unwrap();
let _ = rt.block_on(future::lazy(|| { let _ = rt.block_on(future::lazy(|| {
let settings = WorkerSettings::<()>::new( let settings = ServiceConfig::<()>::new(
(), (),
KeepAlive::Os, KeepAlive::Os,
0, 0,

View File

@ -1228,7 +1228,7 @@ fn test_custom_pipeline() {
use actix::System; use actix::System;
use actix_net::service::NewServiceExt; use actix_net::service::NewServiceExt;
use actix_web::server::{ use actix_web::server::{
HttpService, KeepAlive, StreamConfiguration, WorkerSettings, HttpService, KeepAlive, ServiceConfig, StreamConfiguration,
}; };
let addr = test::TestServer::unused_addr(); let addr = test::TestServer::unused_addr();
@ -1239,7 +1239,7 @@ fn test_custom_pipeline() {
let app = App::new() let app = App::new()
.route("/", http::Method::GET, |_: HttpRequest| "OK") .route("/", http::Method::GET, |_: HttpRequest| "OK")
.finish(); .finish();
let settings = WorkerSettings::build(app) let settings = ServiceConfig::build(app)
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
.client_timeout(1000) .client_timeout(1000)
.client_shutdown(1000) .client_shutdown(1000)