From c1e0b4f32275b212992c9f9991e3f4797e66c152 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 1 Oct 2018 14:43:06 -0700 Subject: [PATCH] expose internal http server types and allow to create custom http pipelines --- src/server/builder.rs | 132 +++++------------------------------------ src/server/channel.rs | 12 ++-- src/server/error.rs | 34 +++++++++++ src/server/h1.rs | 12 ++-- src/server/h2.rs | 14 ++--- src/server/http.rs | 34 ++++++----- src/server/incoming.rs | 8 ++- src/server/mod.rs | 8 +-- src/server/service.rs | 12 ++-- src/server/settings.rs | 24 ++++---- tests/test_server.rs | 37 ++++++++++++ 11 files changed, 148 insertions(+), 179 deletions(-) diff --git a/src/server/builder.rs b/src/server/builder.rs index c9a97af3e..8e7f82f80 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::{fmt, net}; use actix_net::either::Either; @@ -9,61 +8,39 @@ use super::acceptor::{ AcceptorServiceFactory, AcceptorTimeout, ServerMessageAcceptor, TcpAcceptor, }; use super::error::AcceptorError; -use super::handler::{HttpHandler, IntoHttpHandler}; +use super::handler::IntoHttpHandler; use super::service::HttpService; use super::settings::{ServerSettings, WorkerSettings}; -use super::{IoStream, KeepAlive}; +use super::KeepAlive; pub(crate) trait ServiceProvider { fn register( - &self, server: Server, lst: net::TcpListener, host: Option, + &self, server: Server, lst: net::TcpListener, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, ) -> Server; } /// Utility type that builds complete http pipeline -pub struct HttpServiceBuilder +pub struct HttpServiceBuilder where F: Fn() -> H + Send + Clone, { factory: F, acceptor: A, - pipeline: P, no_client_timer: bool, } -impl HttpServiceBuilder> -where - Io: IoStream + Send, - F: Fn() -> H + Send + Clone + 'static, - H: IntoHttpHandler, - A: AcceptorServiceFactory, - ::InitError: fmt::Debug, -{ - /// Create http service builder with default pipeline factory - pub fn with_default_pipeline(factory: F, acceptor: A) -> Self { - Self { - factory, - acceptor, - pipeline: DefaultPipelineFactory::new(), - no_client_timer: false, - } - } -} - -impl HttpServiceBuilder +impl HttpServiceBuilder where F: Fn() -> H + Send + Clone + 'static, H: IntoHttpHandler, A: AcceptorServiceFactory, ::InitError: fmt::Debug, - P: HttpPipelineFactory, { /// Create http service builder - pub fn new(factory: F, acceptor: A, pipeline: P) -> Self { + pub fn new(factory: F, acceptor: A) -> Self { Self { factory, - pipeline, acceptor, no_client_timer: false, } @@ -75,34 +52,20 @@ where } /// Use different acceptor factory - pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder + pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder where A1: AcceptorServiceFactory, ::InitError: fmt::Debug, { HttpServiceBuilder { acceptor, - pipeline: self.pipeline, - factory: self.factory.clone(), - no_client_timer: self.no_client_timer, - } - } - - /// Use different pipeline factory - pub fn pipeline(self, pipeline: P1) -> HttpServiceBuilder - where - P1: HttpPipelineFactory, - { - HttpServiceBuilder { - pipeline, - acceptor: self.acceptor, factory: self.factory.clone(), no_client_timer: self.no_client_timer, } } fn finish( - &self, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, + &self, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, ) -> impl ServiceFactory { let timeout = if self.no_client_timer { @@ -111,7 +74,6 @@ where client_timeout }; let factory = self.factory.clone(); - let pipeline = self.pipeline.clone(); let acceptor = self.acceptor.clone(); move || { let app = (factory)().into_handler(); @@ -119,7 +81,7 @@ where app, keep_alive, timeout as u64, - ServerSettings::new(Some(addr), &host, false), + ServerSettings::new(addr, &host, false), ); if timeout == 0 { @@ -129,8 +91,7 @@ where .map_err(|_| ()) .map_init_err(|_| ()) .and_then( - pipeline - .create(settings) + HttpService::new(settings) .map_init_err(|_| ()) .map_err(|_| ()), ), @@ -142,8 +103,7 @@ where .map_err(|_| ()) .map_init_err(|_| ()) .and_then( - pipeline - .create(settings) + HttpService::new(settings) .map_init_err(|_| ()) .map_err(|_| ()), ), @@ -153,33 +113,30 @@ where } } -impl Clone for HttpServiceBuilder +impl Clone for HttpServiceBuilder where F: Fn() -> H + Send + Clone, H: IntoHttpHandler, A: AcceptorServiceFactory, - P: HttpPipelineFactory, { fn clone(&self) -> Self { HttpServiceBuilder { factory: self.factory.clone(), acceptor: self.acceptor.clone(), - pipeline: self.pipeline.clone(), no_client_timer: self.no_client_timer, } } } -impl ServiceProvider for HttpServiceBuilder +impl ServiceProvider for HttpServiceBuilder where F: Fn() -> H + Send + Clone + 'static, A: AcceptorServiceFactory, ::InitError: fmt::Debug, - P: HttpPipelineFactory, H: IntoHttpHandler, { fn register( - &self, server: Server, lst: net::TcpListener, host: Option, + &self, server: Server, lst: net::TcpListener, host: String, addr: net::SocketAddr, keep_alive: KeepAlive, client_timeout: usize, ) -> Server { server.listen2( @@ -189,64 +146,3 @@ where ) } } - -pub trait HttpPipelineFactory: Send + Clone + 'static { - type Io: IoStream; - type NewService: NewService; - - fn create(&self, settings: WorkerSettings) -> Self::NewService; -} - -impl HttpPipelineFactory for F -where - F: Fn(WorkerSettings) -> T + Send + Clone + 'static, - T: NewService, - T::Request: IoStream, - H: HttpHandler, -{ - type Io = T::Request; - type NewService = T; - - fn create(&self, settings: WorkerSettings) -> T { - (self)(settings) - } -} - -pub(crate) struct DefaultPipelineFactory { - _t: PhantomData<(H, Io)>, -} - -unsafe impl Send for DefaultPipelineFactory {} - -impl DefaultPipelineFactory -where - Io: IoStream + Send, - H: HttpHandler + 'static, -{ - pub fn new() -> Self { - Self { _t: PhantomData } - } -} - -impl Clone for DefaultPipelineFactory -where - Io: IoStream, - H: HttpHandler, -{ - fn clone(&self) -> Self { - Self { _t: PhantomData } - } -} - -impl HttpPipelineFactory for DefaultPipelineFactory -where - Io: IoStream, - H: HttpHandler + 'static, -{ - type Io = Io; - type NewService = HttpService; - - fn create(&self, settings: WorkerSettings) -> Self::NewService { - HttpService::new(settings) - } -} diff --git a/src/server/channel.rs b/src/server/channel.rs index c1e6b6b24..3cea291fd 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -6,6 +6,7 @@ use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::Delay; +use super::error::HttpDispatchError; use super::settings::WorkerSettings; use super::{h1, h2, HttpHandler, IoStream}; @@ -86,7 +87,7 @@ where H: HttpHandler + 'static, { type Item = (); - type Error = (); + type Error = HttpDispatchError; fn poll(&mut self) -> Poll { // keep-alive timer @@ -127,6 +128,7 @@ where return h2.poll(); } Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => { + let mut err = None; let mut disconnect = false; match io.read_available(buf) { Ok(Async::Ready((read_some, stream_closed))) => { @@ -136,14 +138,16 @@ where disconnect = true; } } - Err(_) => { - disconnect = true; + Err(e) => { + err = Some(e.into()); } _ => (), } if disconnect { debug!("Ignored premature client disconnection"); - return Err(()); + return Ok(Async::Ready(())); + } else if let Some(e) = err { + return Err(e); } if buf.len() >= 14 { diff --git a/src/server/error.rs b/src/server/error.rs index ff8b831a7..b8b602266 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -1,6 +1,7 @@ use std::io; use futures::{Async, Poll}; +use http2; use super::{helpers, HttpHandlerTask, Writer}; use http::{StatusCode, Version}; @@ -19,6 +20,39 @@ pub enum AcceptorError { Timeout, } +#[derive(Fail, Debug)] +/// A set of errors that can occur during dispatching http requests +pub enum HttpDispatchError { + /// Application error + #[fail(display = "Application specific error")] + AppError, + + /// An `io::Error` that occurred while trying to read or write to a network + /// stream. + #[fail(display = "IO error: {}", _0)] + Io(io::Error), + + /// The first request did not complete within the specified timeout. + #[fail(display = "The first request did not complete within the specified timeout")] + SlowRequestTimeout, + + /// HTTP2 error + #[fail(display = "HTTP2 error: {}", _0)] + Http2(http2::Error), +} + +impl From for HttpDispatchError { + fn from(err: io::Error) -> Self { + HttpDispatchError::Io(err) + } +} + +impl From for HttpDispatchError { + fn from(err: http2::Error) -> Self { + HttpDispatchError::Http2(err) + } +} + pub(crate) struct ServerError(Version, StatusCode); impl ServerError { diff --git a/src/server/h1.rs b/src/server/h1.rs index 76c0d4b6e..b17981225 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -10,7 +10,7 @@ use error::{Error, PayloadError}; use http::{StatusCode, Version}; use payload::{Payload, PayloadStatus, PayloadWriter}; -use super::error::ServerError; +use super::error::{HttpDispatchError, ServerError}; use super::h1decoder::{DecoderError, H1Decoder, Message}; use super::h1writer::H1Writer; use super::input::PayloadType; @@ -172,7 +172,7 @@ where } #[inline] - pub fn poll(&mut self) -> Poll<(), ()> { + pub fn poll(&mut self) -> Poll<(), HttpDispatchError> { // check connection keep-alive if !self.poll_keep_alive() { return Ok(Async::Ready(())); @@ -190,7 +190,7 @@ where Ok(Async::Ready(_)) => return Ok(Async::Ready(())), Err(err) => { debug!("Error sending data: {}", err); - return Err(()); + return Err(err.into()); } } } @@ -303,7 +303,7 @@ where } } - pub fn poll_handler(&mut self) -> Poll { + pub fn poll_handler(&mut self) -> Poll { let retry = self.can_read(); // check in-flight messages @@ -321,7 +321,7 @@ where return Ok(Async::NotReady); } self.flags.insert(Flags::ERROR); - return Err(()); + return Err(HttpDispatchError::AppError); } match self.tasks[idx].pipe.poll_io(&mut self.stream) { @@ -404,7 +404,7 @@ where debug!("Error sending data: {}", err); self.read_disconnected(); self.write_disconnected(); - return Err(()); + return Err(err.into()); } Ok(Async::Ready(_)) => { // non consumed payload in that case close connection diff --git a/src/server/h2.rs b/src/server/h2.rs index d9ca2f64a..589e77c2d 100644 --- a/src/server/h2.rs +++ b/src/server/h2.rs @@ -19,7 +19,7 @@ use http::{StatusCode, Version}; use payload::{Payload, PayloadStatus, PayloadWriter}; use uri::Url; -use super::error::ServerError; +use super::error::{HttpDispatchError, ServerError}; use super::h2writer::H2Writer; use super::input::PayloadType; use super::settings::WorkerSettings; @@ -86,7 +86,7 @@ where &self.settings } - pub fn poll(&mut self) -> Poll<(), ()> { + pub fn poll(&mut self) -> Poll<(), HttpDispatchError> { // server if let State::Connection(ref mut conn) = self.state { // keep-alive timer @@ -244,9 +244,7 @@ where } } else { // keep-alive disable, drop connection - return conn.poll_close().map_err(|e| { - error!("Error during connection close: {}", e) - }); + return conn.poll_close().map_err(|e| e.into()); } } else { // keep-alive unset, rely on operating system @@ -267,9 +265,7 @@ where if not_ready { if self.tasks.is_empty() && self.flags.contains(Flags::DISCONNECTED) { - return conn - .poll_close() - .map_err(|e| error!("Error during connection close: {}", e)); + return conn.poll_close().map_err(|e| e.into()); } else { return Ok(Async::NotReady); } @@ -284,7 +280,7 @@ where Ok(Async::NotReady) => return Ok(Async::NotReady), Err(err) => { trace!("Error handling connection: {}", err); - return Err(()); + return Err(err.into()); } } } else { diff --git a/src/server/http.rs b/src/server/http.rs index 6344771b6..311c53cb2 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -18,7 +18,7 @@ use openssl::ssl::SslAcceptorBuilder; use rustls::ServerConfig; use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor}; -use super::builder::{DefaultPipelineFactory, HttpServiceBuilder, ServiceProvider}; +use super::builder::{HttpServiceBuilder, ServiceProvider}; use super::{IntoHttpHandler, KeepAlive}; struct Socket { @@ -131,7 +131,7 @@ where self } - /// Set server client timneout in milliseconds for first request. + /// Set server client timeout in milliseconds for first request. /// /// Defines a timeout for reading client request header. If a client does not transmit /// the entire set headers within this time, the request is terminated with @@ -218,11 +218,8 @@ where addr, scheme: "http", handler: Box::new( - HttpServiceBuilder::new( - self.factory.clone(), - DefaultAcceptor, - DefaultPipelineFactory::new(), - ).no_client_timer(), + HttpServiceBuilder::new(self.factory.clone(), DefaultAcceptor) + .no_client_timer(), ), }); @@ -231,7 +228,7 @@ where #[doc(hidden)] /// Use listener for accepting incoming connection requests - pub(crate) fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self + pub fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self where A: AcceptorServiceFactory, ::InitError: fmt::Debug, @@ -241,11 +238,7 @@ where lst, addr, scheme: "https", - handler: Box::new(HttpServiceBuilder::new( - self.factory.clone(), - acceptor, - DefaultPipelineFactory::new(), - )), + handler: Box::new(HttpServiceBuilder::new(self.factory.clone(), acceptor)), }); self @@ -339,7 +332,6 @@ where handler: Box::new(HttpServiceBuilder::new( self.factory.clone(), acceptor.clone(), - DefaultPipelineFactory::new(), )), }); } @@ -483,10 +475,15 @@ impl H + Send + Clone> HttpServer { let sockets = mem::replace(&mut self.sockets, Vec::new()); for socket in sockets { + let host = self + .host + .as_ref() + .map(|h| h.to_owned()) + .unwrap_or_else(|| format!("{}", socket.addr)); srv = socket.handler.register( srv, socket.lst, - self.host.clone(), + host, socket.addr, self.keep_alive.clone(), self.client_timeout, @@ -524,10 +521,15 @@ impl H + Send + Clone> HttpServer { /// Register current http server as actix-net's server service pub fn register(self, mut srv: Server) -> Server { for socket in self.sockets { + let host = self + .host + .as_ref() + .map(|h| h.to_owned()) + .unwrap_or_else(|| format!("{}", socket.addr)); srv = socket.handler.register( srv, socket.lst, - self.host.clone(), + host, socket.addr, self.keep_alive.clone(), self.client_timeout, diff --git a/src/server/incoming.rs b/src/server/incoming.rs index 7ab289d04..c77280084 100644 --- a/src/server/incoming.rs +++ b/src/server/incoming.rs @@ -2,7 +2,7 @@ use std::{io, net}; use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message}; -use futures::Stream; +use futures::{Future, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use super::channel::{HttpChannel, WrapperStream}; @@ -36,7 +36,7 @@ where apps, self.keep_alive, self.client_timeout as u64, - ServerSettings::new(Some(addr), &self.host, secure), + ServerSettings::new(addr, "127.0.0.1:8080", secure), ); // start server @@ -65,6 +65,8 @@ where type Result = (); fn handle(&mut self, msg: WrapperStream, _: &mut Context) -> Self::Result { - Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); + Arbiter::spawn( + HttpChannel::new(self.settings.clone(), msg, None).map_err(|_| ()), + ); } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 1e145571c..f9d2b585e 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -140,15 +140,15 @@ mod ssl; pub use self::handler::*; pub use self::http::HttpServer; pub use self::message::Request; -pub use self::settings::ServerSettings; pub use self::ssl::*; +pub use self::error::{AcceptorError, HttpDispatchError}; +pub use self::service::HttpService; +pub use self::settings::{ServerSettings, WorkerSettings}; + #[doc(hidden)] pub use self::helpers::write_content_length; -#[doc(hidden)] -pub use self::builder::HttpServiceBuilder; - use body::Binary; use extensions::Extensions; use header::ContentEncoding; diff --git a/src/server/service.rs b/src/server/service.rs index 042c86ed4..2988bc661 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -5,11 +5,12 @@ use futures::future::{ok, FutureResult}; use futures::{Async, Poll}; use super::channel::HttpChannel; +use super::error::HttpDispatchError; use super::handler::HttpHandler; use super::settings::WorkerSettings; use super::IoStream; -pub(crate) struct HttpService +pub struct HttpService where H: HttpHandler, Io: IoStream, @@ -23,6 +24,7 @@ where H: HttpHandler, Io: IoStream, { + /// Create new `HttpService` instance. pub fn new(settings: WorkerSettings) -> Self { HttpService { settings, @@ -38,17 +40,17 @@ where { type Request = Io; type Response = (); - type Error = (); + type Error = HttpDispatchError; type InitError = (); type Service = HttpServiceHandler; - type Future = FutureResult; + type Future = FutureResult; fn new_service(&self) -> Self::Future { ok(HttpServiceHandler::new(self.settings.clone())) } } -pub(crate) struct HttpServiceHandler +pub struct HttpServiceHandler where H: HttpHandler, Io: IoStream, @@ -84,7 +86,7 @@ where { type Request = Io; type Response = (); - type Error = (); + type Error = HttpDispatchError; type Future = HttpChannel; fn poll_ready(&mut self) -> Poll<(), Self::Error> { diff --git a/src/server/settings.rs b/src/server/settings.rs index 5ca777290..fbe515f99 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -43,7 +43,7 @@ lazy_static! { /// Various server settings pub struct ServerSettings { - addr: Option, + addr: net::SocketAddr, secure: bool, host: String, cpu_pool: LazyCell, @@ -65,7 +65,7 @@ impl Clone for ServerSettings { impl Default for ServerSettings { fn default() -> Self { ServerSettings { - addr: None, + addr: "127.0.0.1:8080".parse().unwrap(), secure: false, host: "localhost:8080".to_owned(), responses: HttpResponsePool::get_pool(), @@ -76,16 +76,8 @@ impl Default for ServerSettings { impl ServerSettings { /// Crate server settings instance - pub(crate) fn new( - addr: Option, host: &Option, secure: bool, - ) -> ServerSettings { - let host = if let Some(ref host) = *host { - host.clone() - } else if let Some(ref addr) = addr { - format!("{}", addr) - } else { - "localhost".to_owned() - }; + pub fn new(addr: net::SocketAddr, host: &str, secure: bool) -> ServerSettings { + let host = host.to_owned(); let cpu_pool = LazyCell::new(); let responses = HttpResponsePool::get_pool(); ServerSettings { @@ -98,7 +90,7 @@ impl ServerSettings { } /// Returns the socket address of the local half of this TCP connection - pub fn local_addr(&self) -> Option { + pub fn local_addr(&self) -> net::SocketAddr { self.addr } @@ -153,7 +145,7 @@ impl Clone for WorkerSettings { } impl WorkerSettings { - pub(crate) fn new( + pub fn new( handler: H, keep_alive: KeepAlive, client_timeout: u64, settings: ServerSettings, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { @@ -188,11 +180,13 @@ impl WorkerSettings { } #[inline] + /// Keep alive duration if configured. pub fn keep_alive(&self) -> Option { self.0.keep_alive } #[inline] + /// Return state of connection keep-alive funcitonality pub fn keep_alive_enabled(&self) -> bool { self.0.ka_enabled } @@ -217,6 +211,7 @@ impl WorkerSettings { impl WorkerSettings { #[inline] + /// Client timeout for first request. pub fn client_timer(&self) -> Option { let delay = self.0.client_timeout; if delay != 0 { @@ -227,6 +222,7 @@ impl WorkerSettings { } #[inline] + /// Return keep-alive timer delay is configured. pub fn keep_alive_timer(&self) -> Option { if let Some(ka) = self.0.keep_alive { Some(Delay::new(self.now() + ka)) diff --git a/tests/test_server.rs b/tests/test_server.rs index c1dbf531d..66b96ecce 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -1,4 +1,5 @@ extern crate actix; +extern crate actix_net; extern crate actix_web; #[cfg(feature = "brotli")] extern crate brotli2; @@ -18,6 +19,7 @@ use std::io::{Read, Write}; use std::sync::Arc; use std::{thread, time}; +use actix_net::server::Server; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; use bytes::{Bytes, BytesMut}; @@ -1010,3 +1012,38 @@ fn test_server_cookies() { assert_eq!(cookies[1], first_cookie); } } + +#[test] +fn test_custom_pipeline() { + use actix::System; + use actix_web::server::{HttpService, KeepAlive, ServerSettings, WorkerSettings}; + + let addr = test::TestServer::unused_addr(); + + thread::spawn(move || { + Server::new() + .bind("test", addr, move || { + let app = App::new() + .route("/", http::Method::GET, |_: HttpRequest| "OK") + .finish(); + let settings = WorkerSettings::new( + app, + KeepAlive::Disabled, + 10, + ServerSettings::new(addr, "localhost", false), + ); + + HttpService::new(settings) + }).unwrap() + .run(); + }); + + let mut sys = System::new("test"); + { + let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) + .finish() + .unwrap(); + let response = sys.block_on(req.send()).unwrap(); + assert!(response.status().is_success()); + } +}