From f2d42e5e7719383fccdf97315437da27a4991dfb Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 28 Sep 2018 11:50:47 -0700 Subject: [PATCH] refactor acceptor error handling --- Cargo.toml | 4 +- src/client/connector.rs | 4 +- src/server/acceptor.rs | 275 ++++++++++++++++++++++++---------------- src/server/builder.rs | 38 ++++-- src/server/channel.rs | 5 - src/server/error.rs | 15 +++ src/server/http.rs | 70 +--------- src/server/incoming.rs | 70 ++++++++++ src/server/mod.rs | 1 + 9 files changed, 288 insertions(+), 194 deletions(-) create mode 100644 src/server/incoming.rs diff --git a/Cargo.toml b/Cargo.toml index 205e178b9..0e95c327c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,8 +60,8 @@ flate2-rust = ["flate2/rust_backend"] [dependencies] actix = "0.7.0" -actix-net = { git="https://github.com/actix/actix-net.git" } -#actix-net = { path = "../actix-net" } +#actix-net = { git="https://github.com/actix/actix-net.git" } +actix-net = { path = "../actix-net" } base64 = "0.9" bitflags = "1.0" diff --git a/src/client/connector.rs b/src/client/connector.rs index 8d71913fe..6e82e3fd8 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -51,7 +51,7 @@ type SslConnector = Arc; feature = "ssl", feature = "tls", feature = "rust-tls", -),))] +)))] type SslConnector = (); use server::IoStream; @@ -290,7 +290,7 @@ impl Default for ClientConnector { feature = "ssl", feature = "tls", feature = "rust-tls", - ),))] + )))] { () } diff --git a/src/server/acceptor.rs b/src/server/acceptor.rs index d78474160..caad0e2e3 100644 --- a/src/server/acceptor.rs +++ b/src/server/acceptor.rs @@ -1,3 +1,4 @@ +use std::net; use std::time::Duration; use actix_net::server::ServerMessage; @@ -8,6 +9,7 @@ use tokio_reactor::Handle; use tokio_tcp::TcpStream; use tokio_timer::{sleep, Delay}; +use super::error::AcceptorError; use super::handler::HttpHandler; use super::settings::WorkerSettings; use super::IoStream; @@ -15,12 +17,7 @@ use super::IoStream; /// This trait indicates types that can create acceptor service for http server. pub trait AcceptorServiceFactory: Send + Clone + 'static { type Io: IoStream + Send; - type NewService: NewService< - Request = TcpStream, - Response = Self::Io, - Error = (), - InitError = (), - >; + type NewService: NewService; fn create(&self) -> Self::NewService; } @@ -29,7 +26,7 @@ impl AcceptorServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T::Response: IoStream + Send, - T: NewService, + T: NewService, { type Io = T::Response; type NewService = T; @@ -80,144 +77,91 @@ impl Service for DefaultAcceptor { } } -pub(crate) struct TcpAcceptor { +pub(crate) struct TcpAcceptor { inner: T, - settings: WorkerSettings, } -impl TcpAcceptor +impl TcpAcceptor where - H: HttpHandler, - T: NewService, + T: NewService>, { - pub(crate) fn new(settings: WorkerSettings, inner: T) -> Self { - TcpAcceptor { inner, settings } + pub(crate) fn new(inner: T) -> Self { + TcpAcceptor { inner } } } -impl NewService for TcpAcceptor +impl NewService for TcpAcceptor where - H: HttpHandler, - T: NewService, + T: NewService>, { - type Request = ServerMessage; - type Response = (); - type Error = (); - type InitError = (); - type Service = TcpAcceptorService; - type Future = TcpAcceptorResponse; + type Request = net::TcpStream; + type Response = T::Response; + type Error = AcceptorError; + type InitError = T::InitError; + type Service = TcpAcceptorService; + type Future = TcpAcceptorResponse; fn new_service(&self) -> Self::Future { TcpAcceptorResponse { fut: self.inner.new_service(), - settings: self.settings.clone(), } } } -pub(crate) struct TcpAcceptorResponse +pub(crate) struct TcpAcceptorResponse where - H: HttpHandler, T: NewService, { fut: T::Future, - settings: WorkerSettings, } -impl Future for TcpAcceptorResponse +impl Future for TcpAcceptorResponse where - H: HttpHandler, T: NewService, { - type Item = TcpAcceptorService; - type Error = (); + type Item = TcpAcceptorService; + type Error = T::InitError; fn poll(&mut self) -> Poll { - match self.fut.poll() { - Err(_) => Err(()), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(service)) => Ok(Async::Ready(TcpAcceptorService { - inner: service, - settings: self.settings.clone(), - })), + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(service) => { + Ok(Async::Ready(TcpAcceptorService { inner: service })) + } } } } -pub(crate) struct TcpAcceptorService { +pub(crate) struct TcpAcceptorService { inner: T, - settings: WorkerSettings, } -impl Service for TcpAcceptorService +impl Service for TcpAcceptorService where - H: HttpHandler, - T: Service, + T: Service>, { - type Request = ServerMessage; - type Response = (); - type Error = (); - type Future = Either, FutureResult<(), ()>>; + type Request = net::TcpStream; + type Response = T::Response; + type Error = AcceptorError; + type Future = Either>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(|_| ()) + self.inner.poll_ready() } fn call(&mut self, req: Self::Request) -> Self::Future { - match req { - ServerMessage::Connect(stream) => { - let stream = - TcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); + let stream = TcpStream::from_std(req, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + AcceptorError::Io(e) + }); - if let Ok(stream) = stream { - Either::A(TcpAcceptorServiceFut { - fut: self.inner.call(stream), - }) - } else { - Either::B(err(())) - } - } - ServerMessage::Shutdown(timeout) => Either::B(ok(())), - ServerMessage::ForceShutdown => { - // self.settings.head().traverse::(); - Either::B(ok(())) - } + match stream { + Ok(stream) => Either::A(self.inner.call(stream)), + Err(e) => Either::B(err(e)), } } } -pub(crate) struct TcpAcceptorServiceFut { - fut: T, -} - -impl Future for TcpAcceptorServiceFut -where - T: Future, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.fut.poll() { - Err(_) => Err(()), - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Ok(Async::Ready(())), - } - } -} - -/// Errors produced by `AcceptorTimeout` service. -#[derive(Debug)] -pub enum TimeoutError { - /// The inner service error - Service(T), - - /// The request did not complete within the specified timeout. - Timeout, -} - /// Acceptor timeout middleware /// /// Applies timeout to request prcoessing. @@ -235,7 +179,7 @@ impl AcceptorTimeout { impl NewService for AcceptorTimeout { type Request = T::Request; type Response = T::Response; - type Error = TimeoutError; + type Error = AcceptorError; type InitError = T::InitError; type Service = AcceptorTimeoutService; type Future = AcceptorTimeoutFut; @@ -278,11 +222,11 @@ pub(crate) struct AcceptorTimeoutService { impl Service for AcceptorTimeoutService { type Request = T::Request; type Response = T::Response; - type Error = TimeoutError; + type Error = AcceptorError; type Future = AcceptorTimeoutResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(TimeoutError::Service) + self.inner.poll_ready().map_err(AcceptorError::Service) } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -299,17 +243,134 @@ pub(crate) struct AcceptorTimeoutResponse { } impl Future for AcceptorTimeoutResponse { type Item = T::Response; - type Error = TimeoutError; + type Error = AcceptorError; fn poll(&mut self) -> Poll { - match self.fut.poll() { - Ok(Async::NotReady) => match self.sleep.poll() { - Err(_) => Err(TimeoutError::Timeout), - Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), + match self.fut.poll().map_err(AcceptorError::Service)? { + Async::NotReady => match self.sleep.poll() { + Err(_) => Err(AcceptorError::Timeout), + Ok(Async::Ready(_)) => Err(AcceptorError::Timeout), Ok(Async::NotReady) => Ok(Async::NotReady), }, - Ok(Async::Ready(resp)) => Ok(Async::Ready(resp)), - Err(err) => Err(TimeoutError::Service(err)), + Async::Ready(resp) => Ok(Async::Ready(resp)), + } + } +} + +pub(crate) struct ServerMessageAcceptor { + inner: T, + settings: WorkerSettings, +} + +impl ServerMessageAcceptor +where + H: HttpHandler, + T: NewService, +{ + pub(crate) fn new(settings: WorkerSettings, inner: T) -> Self { + ServerMessageAcceptor { inner, settings } + } +} + +impl NewService for ServerMessageAcceptor +where + H: HttpHandler, + T: NewService, +{ + type Request = ServerMessage; + type Response = (); + type Error = T::Error; + type InitError = T::InitError; + type Service = ServerMessageAcceptorService; + type Future = ServerMessageAcceptorResponse; + + fn new_service(&self) -> Self::Future { + ServerMessageAcceptorResponse { + fut: self.inner.new_service(), + settings: self.settings.clone(), + } + } +} + +pub(crate) struct ServerMessageAcceptorResponse +where + H: HttpHandler, + T: NewService, +{ + fut: T::Future, + settings: WorkerSettings, +} + +impl Future for ServerMessageAcceptorResponse +where + H: HttpHandler, + T: NewService, +{ + type Item = ServerMessageAcceptorService; + type Error = T::InitError; + + fn poll(&mut self) -> Poll { + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(service) => Ok(Async::Ready(ServerMessageAcceptorService { + inner: service, + settings: self.settings.clone(), + })), + } + } +} + +pub(crate) struct ServerMessageAcceptorService { + inner: T, + settings: WorkerSettings, +} + +impl Service for ServerMessageAcceptorService +where + H: HttpHandler, + T: Service, +{ + type Request = ServerMessage; + type Response = (); + type Error = T::Error; + type Future = + Either, FutureResult<(), Self::Error>>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + match req { + ServerMessage::Connect(stream) => { + Either::A(ServerMessageAcceptorServiceFut { + fut: self.inner.call(stream), + }) + } + ServerMessage::Shutdown(timeout) => Either::B(ok(())), + ServerMessage::ForceShutdown => { + // self.settings.head().traverse::(); + Either::B(ok(())) + } + } + } +} + +pub(crate) struct ServerMessageAcceptorServiceFut { + fut: T::Future, +} + +impl Future for ServerMessageAcceptorServiceFut +where + T: Service, +{ + type Item = (); + type Error = T::Error; + + fn poll(&mut self) -> Poll { + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(_) => Ok(Async::Ready(())), } } } diff --git a/src/server/builder.rs b/src/server/builder.rs index 28541820b..46ab9f467 100644 --- a/src/server/builder.rs +++ b/src/server/builder.rs @@ -5,7 +5,10 @@ use actix_net::either::Either; use actix_net::server::{Server, ServiceFactory}; use actix_net::service::{NewService, NewServiceExt}; -use super::acceptor::{AcceptorServiceFactory, AcceptorTimeout, TcpAcceptor}; +use super::acceptor::{ + AcceptorServiceFactory, AcceptorTimeout, ServerMessageAcceptor, TcpAcceptor, +}; +use super::error::AcceptorError; use super::handler::{HttpHandler, IntoHttpHandler}; use super::service::HttpService; use super::settings::{ServerSettings, WorkerSettings}; @@ -99,16 +102,30 @@ where ); if timeout == 0 { - Either::A(TcpAcceptor::new( + Either::A(ServerMessageAcceptor::new( settings.clone(), - acceptor.create().and_then(pipeline.create(settings)), + TcpAcceptor::new(acceptor.create().map_err(AcceptorError::Service)) + .map_err(|_| ()) + .map_init_err(|_| ()) + .and_then( + pipeline + .create(settings) + .map_init_err(|_| ()) + .map_err(|_| ()), + ), )) } else { - Either::B(TcpAcceptor::new( + Either::B(ServerMessageAcceptor::new( settings.clone(), - AcceptorTimeout::new(timeout, acceptor.create()) + TcpAcceptor::new(AcceptorTimeout::new(timeout, acceptor.create())) .map_err(|_| ()) - .and_then(pipeline.create(settings)), + .map_init_err(|_| ()) + .and_then( + pipeline + .create(settings) + .map_init_err(|_| ()) + .map_err(|_| ()), + ), )) } } @@ -153,12 +170,7 @@ where pub trait HttpPipelineFactory: Send + Clone + 'static { type Io: IoStream; - type NewService: NewService< - Request = Self::Io, - Response = (), - Error = (), - InitError = (), - >; + type NewService: NewService; fn create(&self, settings: WorkerSettings) -> Self::NewService; } @@ -166,7 +178,7 @@ pub trait HttpPipelineFactory: Send + Clone + 'static { impl HttpPipelineFactory for F where F: Fn(WorkerSettings) -> T + Send + Clone + 'static, - T: NewService, + T: NewService, T::Request: IoStream, H: HttpHandler, { diff --git a/src/server/channel.rs b/src/server/channel.rs index 0d92c23a3..c1e6b6b24 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -1,7 +1,6 @@ use std::net::{Shutdown, SocketAddr}; use std::{io, ptr, time}; -use actix::Message; use bytes::{Buf, BufMut, BytesMut}; use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -283,10 +282,6 @@ where io: T, } -impl Message for WrapperStream { - type Result = (); -} - impl WrapperStream where T: AsyncRead + AsyncWrite + 'static, diff --git a/src/server/error.rs b/src/server/error.rs index d08ccf87f..ff8b831a7 100644 --- a/src/server/error.rs +++ b/src/server/error.rs @@ -1,9 +1,24 @@ +use std::io; + use futures::{Async, Poll}; use super::{helpers, HttpHandlerTask, Writer}; use http::{StatusCode, Version}; use Error; +/// Errors produced by `AcceptorError` service. +#[derive(Debug)] +pub enum AcceptorError { + /// The inner service error + Service(T), + + /// Io specific error + Io(io::Error), + + /// The request did not complete within the specified timeout. + Timeout, +} + pub(crate) struct ServerError(Version, StatusCode); impl ServerError { diff --git a/src/server/http.rs b/src/server/http.rs index 81c4d3ad6..846f7f010 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,13 +1,11 @@ use std::{io, mem, net}; -use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; +use actix::{Addr, System}; use actix_net::server::Server; use actix_net::ssl; -use futures::Stream; use net2::TcpBuilder; use num_cpus; -use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] use native_tls::TlsAcceptor; @@ -20,9 +18,6 @@ use rustls::ServerConfig; use super::acceptor::{AcceptorServiceFactory, DefaultAcceptor}; use super::builder::{DefaultPipelineFactory, HttpServiceBuilder, ServiceProvider}; -use super::channel::{HttpChannel, WrapperStream}; -use super::handler::HttpHandler; -use super::settings::{ServerSettings, WorkerSettings}; use super::{IntoHttpHandler, KeepAlive}; struct Socket { @@ -42,9 +37,10 @@ where H: IntoHttpHandler + 'static, F: Fn() -> H + Send + Clone, { - factory: F, - host: Option, - keep_alive: KeepAlive, + pub(super) factory: F, + pub(super) host: Option, + pub(super) keep_alive: KeepAlive, + pub(super) client_timeout: usize, backlog: i32, threads: usize, exit: bool, @@ -53,7 +49,6 @@ where no_signals: bool, maxconn: usize, maxconnrate: usize, - client_timeout: usize, sockets: Vec, } @@ -524,61 +519,6 @@ impl H + Send + Clone> HttpServer { } } -impl HttpServer -where - H: IntoHttpHandler, - F: Fn() -> H + Send + Clone, -{ - #[doc(hidden)] - #[deprecated(since = "0.7.8")] - /// Start listening for incoming connections from a stream. - /// - /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(self, stream: S, secure: bool) - where - S: Stream + 'static, - T: AsyncRead + AsyncWrite + 'static, - { - // set server settings - let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let apps = (self.factory)().into_handler(); - let settings = WorkerSettings::new( - apps, - self.keep_alive, - self.client_timeout as u64, - ServerSettings::new(Some(addr), &self.host, secure), - ); - - // start server - HttpIncoming::create(move |ctx| { - ctx.add_message_stream( - stream.map_err(|_| ()).map(move |t| WrapperStream::new(t)), - ); - HttpIncoming { settings } - }); - } -} - -struct HttpIncoming { - settings: WorkerSettings, -} - -impl Actor for HttpIncoming { - type Context = Context; -} - -impl Handler> for HttpIncoming -where - T: AsyncRead + AsyncWrite, - H: HttpHandler, -{ - type Result = (); - - fn handle(&mut self, msg: WrapperStream, _: &mut Context) -> Self::Result { - Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); - } -} - fn create_tcp_listener( addr: net::SocketAddr, backlog: i32, ) -> io::Result { diff --git a/src/server/incoming.rs b/src/server/incoming.rs new file mode 100644 index 000000000..7ab289d04 --- /dev/null +++ b/src/server/incoming.rs @@ -0,0 +1,70 @@ +//! Support for `Stream`, deprecated! +use std::{io, net}; + +use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message}; +use futures::Stream; +use tokio_io::{AsyncRead, AsyncWrite}; + +use super::channel::{HttpChannel, WrapperStream}; +use super::handler::{HttpHandler, IntoHttpHandler}; +use super::http::HttpServer; +use super::settings::{ServerSettings, WorkerSettings}; + +impl Message for WrapperStream { + type Result = (); +} + +impl HttpServer +where + H: IntoHttpHandler, + F: Fn() -> H + Send + Clone, +{ + #[doc(hidden)] + #[deprecated(since = "0.7.8")] + /// Start listening for incoming connections from a stream. + /// + /// This method uses only one thread for handling incoming connections. + pub fn start_incoming(self, stream: S, secure: bool) + where + S: Stream + 'static, + T: AsyncRead + AsyncWrite + 'static, + { + // set server settings + let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); + let apps = (self.factory)().into_handler(); + let settings = WorkerSettings::new( + apps, + self.keep_alive, + self.client_timeout as u64, + ServerSettings::new(Some(addr), &self.host, secure), + ); + + // start server + HttpIncoming::create(move |ctx| { + ctx.add_message_stream( + stream.map_err(|_| ()).map(move |t| WrapperStream::new(t)), + ); + HttpIncoming { settings } + }); + } +} + +struct HttpIncoming { + settings: WorkerSettings, +} + +impl Actor for HttpIncoming { + type Context = Context; +} + +impl Handler> for HttpIncoming +where + T: AsyncRead + AsyncWrite, + H: HttpHandler, +{ + type Result = (); + + fn handle(&mut self, msg: WrapperStream, _: &mut Context) -> Self::Result { + Arbiter::spawn(HttpChannel::new(self.settings.clone(), msg, None)); + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 9e91eda08..1e145571c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -129,6 +129,7 @@ mod h2writer; mod handler; pub(crate) mod helpers; mod http; +pub(crate) mod incoming; pub(crate) mod input; pub(crate) mod message; pub(crate) mod output;