1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

expose internal http server types and allow to create custom http pipelines

This commit is contained in:
Nikolay Kim 2018-10-01 14:43:06 -07:00
parent 5966ee6192
commit c1e0b4f322
11 changed files with 148 additions and 179 deletions

View File

@ -1,4 +1,3 @@
use std::marker::PhantomData;
use std::{fmt, net};
use actix_net::either::Either;
@ -9,61 +8,39 @@ use super::acceptor::{
AcceptorServiceFactory, AcceptorTimeout, ServerMessageAcceptor, TcpAcceptor,
};
use super::error::AcceptorError;
use super::handler::{HttpHandler, IntoHttpHandler};
use super::handler::IntoHttpHandler;
use super::service::HttpService;
use super::settings::{ServerSettings, WorkerSettings};
use super::{IoStream, KeepAlive};
use super::KeepAlive;
pub(crate) trait ServiceProvider {
fn register(
&self, server: Server, lst: net::TcpListener, host: Option<String>,
&self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize,
) -> Server;
}
/// Utility type that builds complete http pipeline
pub struct HttpServiceBuilder<F, H, A, P>
pub struct HttpServiceBuilder<F, H, A>
where
F: Fn() -> H + Send + Clone,
{
factory: F,
acceptor: A,
pipeline: P,
no_client_timer: bool,
}
impl<F, H, A, Io> HttpServiceBuilder<F, H, A, DefaultPipelineFactory<H::Handler, Io>>
where
Io: IoStream + Send,
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler,
A: AcceptorServiceFactory,
<A::NewService as NewService>::InitError: fmt::Debug,
{
/// Create http service builder with default pipeline factory
pub fn with_default_pipeline(factory: F, acceptor: A) -> Self {
Self {
factory,
acceptor,
pipeline: DefaultPipelineFactory::new(),
no_client_timer: false,
}
}
}
impl<F, H, A, P> HttpServiceBuilder<F, H, A, P>
impl<F, H, A> HttpServiceBuilder<F, H, A>
where
F: Fn() -> H + Send + Clone + 'static,
H: IntoHttpHandler,
A: AcceptorServiceFactory,
<A::NewService as NewService>::InitError: fmt::Debug,
P: HttpPipelineFactory<H::Handler, Io = A::Io>,
{
/// Create http service builder
pub fn new(factory: F, acceptor: A, pipeline: P) -> Self {
pub fn new(factory: F, acceptor: A) -> Self {
Self {
factory,
pipeline,
acceptor,
no_client_timer: false,
}
@ -75,34 +52,20 @@ where
}
/// Use different acceptor factory
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1, P>
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<F, H, A1>
where
A1: AcceptorServiceFactory,
<A1::NewService as NewService>::InitError: fmt::Debug,
{
HttpServiceBuilder {
acceptor,
pipeline: self.pipeline,
factory: self.factory.clone(),
no_client_timer: self.no_client_timer,
}
}
/// Use different pipeline factory
pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<F, H, A, P1>
where
P1: HttpPipelineFactory<H::Handler>,
{
HttpServiceBuilder {
pipeline,
acceptor: self.acceptor,
factory: self.factory.clone(),
no_client_timer: self.no_client_timer,
}
}
fn finish(
&self, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive,
&self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive,
client_timeout: usize,
) -> impl ServiceFactory {
let timeout = if self.no_client_timer {
@ -111,7 +74,6 @@ where
client_timeout
};
let factory = self.factory.clone();
let pipeline = self.pipeline.clone();
let acceptor = self.acceptor.clone();
move || {
let app = (factory)().into_handler();
@ -119,7 +81,7 @@ where
app,
keep_alive,
timeout as u64,
ServerSettings::new(Some(addr), &host, false),
ServerSettings::new(addr, &host, false),
);
if timeout == 0 {
@ -129,8 +91,7 @@ where
.map_err(|_| ())
.map_init_err(|_| ())
.and_then(
pipeline
.create(settings)
HttpService::new(settings)
.map_init_err(|_| ())
.map_err(|_| ()),
),
@ -142,8 +103,7 @@ where
.map_err(|_| ())
.map_init_err(|_| ())
.and_then(
pipeline
.create(settings)
HttpService::new(settings)
.map_init_err(|_| ())
.map_err(|_| ()),
),
@ -153,33 +113,30 @@ where
}
}
impl<F, H, A, P> Clone for HttpServiceBuilder<F, H, A, P>
impl<F, H, A> Clone for HttpServiceBuilder<F, H, A>
where
F: Fn() -> H + Send + Clone,
H: IntoHttpHandler,
A: AcceptorServiceFactory,
P: HttpPipelineFactory<H::Handler, Io = A::Io>,
{
fn clone(&self) -> Self {
HttpServiceBuilder {
factory: self.factory.clone(),
acceptor: self.acceptor.clone(),
pipeline: self.pipeline.clone(),
no_client_timer: self.no_client_timer,
}
}
}
impl<F, H, A, P> ServiceProvider for HttpServiceBuilder<F, H, A, P>
impl<F, H, A> ServiceProvider for HttpServiceBuilder<F, H, A>
where
F: Fn() -> H + Send + Clone + 'static,
A: AcceptorServiceFactory,
<A::NewService as NewService>::InitError: fmt::Debug,
P: HttpPipelineFactory<H::Handler, Io = A::Io>,
H: IntoHttpHandler,
{
fn register(
&self, server: Server, lst: net::TcpListener, host: Option<String>,
&self, server: Server, lst: net::TcpListener, host: String,
addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize,
) -> Server {
server.listen2(
@ -189,64 +146,3 @@ where
)
}
}
pub trait HttpPipelineFactory<H: HttpHandler>: Send + Clone + 'static {
type Io: IoStream;
type NewService: NewService<Request = Self::Io, Response = ()>;
fn create(&self, settings: WorkerSettings<H>) -> Self::NewService;
}
impl<F, T, H> HttpPipelineFactory<H> for F
where
F: Fn(WorkerSettings<H>) -> T + Send + Clone + 'static,
T: NewService<Response = ()>,
T::Request: IoStream,
H: HttpHandler,
{
type Io = T::Request;
type NewService = T;
fn create(&self, settings: WorkerSettings<H>) -> T {
(self)(settings)
}
}
pub(crate) struct DefaultPipelineFactory<H, Io> {
_t: PhantomData<(H, Io)>,
}
unsafe impl<H, Io> Send for DefaultPipelineFactory<H, Io> {}
impl<H, Io> DefaultPipelineFactory<H, Io>
where
Io: IoStream + Send,
H: HttpHandler + 'static,
{
pub fn new() -> Self {
Self { _t: PhantomData }
}
}
impl<H, Io> Clone for DefaultPipelineFactory<H, Io>
where
Io: IoStream,
H: HttpHandler,
{
fn clone(&self) -> Self {
Self { _t: PhantomData }
}
}
impl<H, Io> HttpPipelineFactory<H> for DefaultPipelineFactory<H, Io>
where
Io: IoStream,
H: HttpHandler + 'static,
{
type Io = Io;
type NewService = HttpService<H, Io>;
fn create(&self, settings: WorkerSettings<H>) -> Self::NewService {
HttpService::new(settings)
}
}

View File

@ -6,6 +6,7 @@ use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use super::error::HttpDispatchError;
use super::settings::WorkerSettings;
use super::{h1, h2, HttpHandler, IoStream};
@ -86,7 +87,7 @@ where
H: HttpHandler + 'static,
{
type Item = ();
type Error = ();
type Error = HttpDispatchError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// keep-alive timer
@ -127,6 +128,7 @@ where
return h2.poll();
}
Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => {
let mut err = None;
let mut disconnect = false;
match io.read_available(buf) {
Ok(Async::Ready((read_some, stream_closed))) => {
@ -136,14 +138,16 @@ where
disconnect = true;
}
}
Err(_) => {
disconnect = true;
Err(e) => {
err = Some(e.into());
}
_ => (),
}
if disconnect {
debug!("Ignored premature client disconnection");
return Err(());
return Ok(Async::Ready(()));
} else if let Some(e) = err {
return Err(e);
}
if buf.len() >= 14 {

View File

@ -1,6 +1,7 @@
use std::io;
use futures::{Async, Poll};
use http2;
use super::{helpers, HttpHandlerTask, Writer};
use http::{StatusCode, Version};
@ -19,6 +20,39 @@ pub enum AcceptorError<T> {
Timeout,
}
#[derive(Fail, Debug)]
/// A set of errors that can occur during dispatching http requests
pub enum HttpDispatchError {
/// Application error
#[fail(display = "Application specific error")]
AppError,
/// An `io::Error` that occurred while trying to read or write to a network
/// stream.
#[fail(display = "IO error: {}", _0)]
Io(io::Error),
/// The first request did not complete within the specified timeout.
#[fail(display = "The first request did not complete within the specified timeout")]
SlowRequestTimeout,
/// HTTP2 error
#[fail(display = "HTTP2 error: {}", _0)]
Http2(http2::Error),
}
impl From<io::Error> for HttpDispatchError {
fn from(err: io::Error) -> Self {
HttpDispatchError::Io(err)
}
}
impl From<http2::Error> for HttpDispatchError {
fn from(err: http2::Error) -> Self {
HttpDispatchError::Http2(err)
}
}
pub(crate) struct ServerError(Version, StatusCode);
impl ServerError {

View File

@ -10,7 +10,7 @@ use error::{Error, PayloadError};
use http::{StatusCode, Version};
use payload::{Payload, PayloadStatus, PayloadWriter};
use super::error::ServerError;
use super::error::{HttpDispatchError, ServerError};
use super::h1decoder::{DecoderError, H1Decoder, Message};
use super::h1writer::H1Writer;
use super::input::PayloadType;
@ -172,7 +172,7 @@ where
}
#[inline]
pub fn poll(&mut self) -> Poll<(), ()> {
pub fn poll(&mut self) -> Poll<(), HttpDispatchError> {
// check connection keep-alive
if !self.poll_keep_alive() {
return Ok(Async::Ready(()));
@ -190,7 +190,7 @@ where
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
Err(err) => {
debug!("Error sending data: {}", err);
return Err(());
return Err(err.into());
}
}
}
@ -303,7 +303,7 @@ where
}
}
pub fn poll_handler(&mut self) -> Poll<bool, ()> {
pub fn poll_handler(&mut self) -> Poll<bool, HttpDispatchError> {
let retry = self.can_read();
// check in-flight messages
@ -321,7 +321,7 @@ where
return Ok(Async::NotReady);
}
self.flags.insert(Flags::ERROR);
return Err(());
return Err(HttpDispatchError::AppError);
}
match self.tasks[idx].pipe.poll_io(&mut self.stream) {
@ -404,7 +404,7 @@ where
debug!("Error sending data: {}", err);
self.read_disconnected();
self.write_disconnected();
return Err(());
return Err(err.into());
}
Ok(Async::Ready(_)) => {
// non consumed payload in that case close connection

View File

@ -19,7 +19,7 @@ use http::{StatusCode, Version};
use payload::{Payload, PayloadStatus, PayloadWriter};
use uri::Url;
use super::error::ServerError;
use super::error::{HttpDispatchError, ServerError};
use super::h2writer::H2Writer;
use super::input::PayloadType;
use super::settings::WorkerSettings;
@ -86,7 +86,7 @@ where
&self.settings
}
pub fn poll(&mut self) -> Poll<(), ()> {
pub fn poll(&mut self) -> Poll<(), HttpDispatchError> {
// server
if let State::Connection(ref mut conn) = self.state {
// keep-alive timer
@ -244,9 +244,7 @@ where
}
} else {
// keep-alive disable, drop connection
return conn.poll_close().map_err(|e| {
error!("Error during connection close: {}", e)
});
return conn.poll_close().map_err(|e| e.into());
}
} else {
// keep-alive unset, rely on operating system
@ -267,9 +265,7 @@ where
if not_ready {
if self.tasks.is_empty() && self.flags.contains(Flags::DISCONNECTED)
{
return conn
.poll_close()
.map_err(|e| error!("Error during connection close: {}", e));
return conn.poll_close().map_err(|e| e.into());
} else {
return Ok(Async::NotReady);
}
@ -284,7 +280,7 @@ where
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
trace!("Error handling connection: {}", err);
return Err(());
return Err(err.into());
}
}
} else {

View File

@ -18,7 +18,7 @@ use openssl::ssl::SslAcceptorBuilder;
use rustls::ServerConfig;
use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor};
use super::builder::{DefaultPipelineFactory, HttpServiceBuilder, ServiceProvider};
use super::builder::{HttpServiceBuilder, ServiceProvider};
use super::{IntoHttpHandler, KeepAlive};
struct Socket {
@ -131,7 +131,7 @@ where
self
}
/// Set server client timneout in milliseconds for first request.
/// Set server client timeout in milliseconds for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
/// the entire set headers within this time, the request is terminated with
@ -218,11 +218,8 @@ where
addr,
scheme: "http",
handler: Box::new(
HttpServiceBuilder::new(
self.factory.clone(),
DefaultAcceptor,
DefaultPipelineFactory::new(),
).no_client_timer(),
HttpServiceBuilder::new(self.factory.clone(), DefaultAcceptor)
.no_client_timer(),
),
});
@ -231,7 +228,7 @@ where
#[doc(hidden)]
/// Use listener for accepting incoming connection requests
pub(crate) fn listen_with<A>(mut self, lst: net::TcpListener, acceptor: A) -> Self
pub fn listen_with<A>(mut self, lst: net::TcpListener, acceptor: A) -> Self
where
A: AcceptorServiceFactory,
<A::NewService as NewService>::InitError: fmt::Debug,
@ -241,11 +238,7 @@ where
lst,
addr,
scheme: "https",
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
acceptor,
DefaultPipelineFactory::new(),
)),
handler: Box::new(HttpServiceBuilder::new(self.factory.clone(), acceptor)),
});
self
@ -339,7 +332,6 @@ where
handler: Box::new(HttpServiceBuilder::new(
self.factory.clone(),
acceptor.clone(),
DefaultPipelineFactory::new(),
)),
});
}
@ -483,10 +475,15 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
let sockets = mem::replace(&mut self.sockets, Vec::new());
for socket in sockets {
let host = self
.host
.as_ref()
.map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr));
srv = socket.handler.register(
srv,
socket.lst,
self.host.clone(),
host,
socket.addr,
self.keep_alive.clone(),
self.client_timeout,
@ -524,10 +521,15 @@ impl<H: IntoHttpHandler, F: Fn() -> H + Send + Clone> HttpServer<H, F> {
/// Register current http server as actix-net's server service
pub fn register(self, mut srv: Server) -> Server {
for socket in self.sockets {
let host = self
.host
.as_ref()
.map(|h| h.to_owned())
.unwrap_or_else(|| format!("{}", socket.addr));
srv = socket.handler.register(
srv,
socket.lst,
self.host.clone(),
host,
socket.addr,
self.keep_alive.clone(),
self.client_timeout,

View File

@ -2,7 +2,7 @@
use std::{io, net};
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message};
use futures::Stream;
use futures::{Future, Stream};
use tokio_io::{AsyncRead, AsyncWrite};
use super::channel::{HttpChannel, WrapperStream};
@ -36,7 +36,7 @@ where
apps,
self.keep_alive,
self.client_timeout as u64,
ServerSettings::new(Some(addr), &self.host, secure),
ServerSettings::new(addr, "127.0.0.1:8080", secure),
);
// start server
@ -65,6 +65,8 @@ where
type Result = ();
fn handle(&mut self, msg: WrapperStream<T>, _: &mut Context<Self>) -> Self::Result {
Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None));
Arbiter::spawn(
HttpChannel::new(self.settings.clone(), msg, None).map_err(|_| ()),
);
}
}

View File

@ -140,15 +140,15 @@ mod ssl;
pub use self::handler::*;
pub use self::http::HttpServer;
pub use self::message::Request;
pub use self::settings::ServerSettings;
pub use self::ssl::*;
pub use self::error::{AcceptorError, HttpDispatchError};
pub use self::service::HttpService;
pub use self::settings::{ServerSettings, WorkerSettings};
#[doc(hidden)]
pub use self::helpers::write_content_length;
#[doc(hidden)]
pub use self::builder::HttpServiceBuilder;
use body::Binary;
use extensions::Extensions;
use header::ContentEncoding;

View File

@ -5,11 +5,12 @@ use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use super::channel::HttpChannel;
use super::error::HttpDispatchError;
use super::handler::HttpHandler;
use super::settings::WorkerSettings;
use super::IoStream;
pub(crate) struct HttpService<H, Io>
pub struct HttpService<H, Io>
where
H: HttpHandler,
Io: IoStream,
@ -23,6 +24,7 @@ where
H: HttpHandler,
Io: IoStream,
{
/// Create new `HttpService` instance.
pub fn new(settings: WorkerSettings<H>) -> Self {
HttpService {
settings,
@ -38,17 +40,17 @@ where
{
type Request = Io;
type Response = ();
type Error = ();
type Error = HttpDispatchError;
type InitError = ();
type Service = HttpServiceHandler<H, Io>;
type Future = FutureResult<Self::Service, Self::Error>;
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self) -> Self::Future {
ok(HttpServiceHandler::new(self.settings.clone()))
}
}
pub(crate) struct HttpServiceHandler<H, Io>
pub struct HttpServiceHandler<H, Io>
where
H: HttpHandler,
Io: IoStream,
@ -84,7 +86,7 @@ where
{
type Request = Io;
type Response = ();
type Error = ();
type Error = HttpDispatchError;
type Future = HttpChannel<Io, H>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {

View File

@ -43,7 +43,7 @@ lazy_static! {
/// Various server settings
pub struct ServerSettings {
addr: Option<net::SocketAddr>,
addr: net::SocketAddr,
secure: bool,
host: String,
cpu_pool: LazyCell<CpuPool>,
@ -65,7 +65,7 @@ impl Clone for ServerSettings {
impl Default for ServerSettings {
fn default() -> Self {
ServerSettings {
addr: None,
addr: "127.0.0.1:8080".parse().unwrap(),
secure: false,
host: "localhost:8080".to_owned(),
responses: HttpResponsePool::get_pool(),
@ -76,16 +76,8 @@ impl Default for ServerSettings {
impl ServerSettings {
/// Crate server settings instance
pub(crate) fn new(
addr: Option<net::SocketAddr>, host: &Option<String>, secure: bool,
) -> ServerSettings {
let host = if let Some(ref host) = *host {
host.clone()
} else if let Some(ref addr) = addr {
format!("{}", addr)
} else {
"localhost".to_owned()
};
pub fn new(addr: net::SocketAddr, host: &str, secure: bool) -> ServerSettings {
let host = host.to_owned();
let cpu_pool = LazyCell::new();
let responses = HttpResponsePool::get_pool();
ServerSettings {
@ -98,7 +90,7 @@ impl ServerSettings {
}
/// Returns the socket address of the local half of this TCP connection
pub fn local_addr(&self) -> Option<net::SocketAddr> {
pub fn local_addr(&self) -> net::SocketAddr {
self.addr
}
@ -153,7 +145,7 @@ impl<H> Clone for WorkerSettings<H> {
}
impl<H> WorkerSettings<H> {
pub(crate) fn new(
pub fn new(
handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings,
) -> WorkerSettings<H> {
let (keep_alive, ka_enabled) = match keep_alive {
@ -188,11 +180,13 @@ impl<H> WorkerSettings<H> {
}
#[inline]
/// Keep alive duration if configured.
pub fn keep_alive(&self) -> Option<Duration> {
self.0.keep_alive
}
#[inline]
/// Return state of connection keep-alive funcitonality
pub fn keep_alive_enabled(&self) -> bool {
self.0.ka_enabled
}
@ -217,6 +211,7 @@ impl<H> WorkerSettings<H> {
impl<H: 'static> WorkerSettings<H> {
#[inline]
/// Client timeout for first request.
pub fn client_timer(&self) -> Option<Delay> {
let delay = self.0.client_timeout;
if delay != 0 {
@ -227,6 +222,7 @@ impl<H: 'static> WorkerSettings<H> {
}
#[inline]
/// Return keep-alive timer delay is configured.
pub fn keep_alive_timer(&self) -> Option<Delay> {
if let Some(ka) = self.0.keep_alive {
Some(Delay::new(self.now() + ka))

View File

@ -1,4 +1,5 @@
extern crate actix;
extern crate actix_net;
extern crate actix_web;
#[cfg(feature = "brotli")]
extern crate brotli2;
@ -18,6 +19,7 @@ use std::io::{Read, Write};
use std::sync::Arc;
use std::{thread, time};
use actix_net::server::Server;
#[cfg(feature = "brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut};
@ -1010,3 +1012,38 @@ fn test_server_cookies() {
assert_eq!(cookies[1], first_cookie);
}
}
#[test]
fn test_custom_pipeline() {
use actix::System;
use actix_web::server::{HttpService, KeepAlive, ServerSettings, WorkerSettings};
let addr = test::TestServer::unused_addr();
thread::spawn(move || {
Server::new()
.bind("test", addr, move || {
let app = App::new()
.route("/", http::Method::GET, |_: HttpRequest| "OK")
.finish();
let settings = WorkerSettings::new(
app,
KeepAlive::Disabled,
10,
ServerSettings::new(addr, "localhost", false),
);
HttpService::new(settings)
}).unwrap()
.run();
});
let mut sys = System::new("test");
{
let req = client::ClientRequest::get(format!("http://{}/", addr).as_str())
.finish()
.unwrap();
let response = sys.block_on(req.send()).unwrap();
assert!(response.status().is_success());
}
}