From a3cfc242328c4e501c22728f73db8f94c27cc413 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 9 Sep 2018 10:51:30 -0700 Subject: [PATCH] refactor acceptor service --- src/server/http.rs | 382 +++++++++++++++++++++++++++++++++------------ src/server/mod.rs | 8 +- src/test.rs | 40 ++--- 3 files changed, 307 insertions(+), 123 deletions(-) diff --git a/src/server/http.rs b/src/server/http.rs index 725cfbac0..41161ed3f 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -1,9 +1,9 @@ use std::marker::PhantomData; -use std::sync::Arc; use std::{io, mem, net, time}; use actix::{Actor, Addr, AsyncContext, Context, Handler, System}; -use actix_net::{ssl, NewService, NewServiceExt, Server, Service}; +use actix_net::server::{Server, ServerServiceFactory}; +use actix_net::{ssl, NewService, NewServiceExt, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Poll, Stream}; @@ -36,11 +36,12 @@ struct Socket { /// By default it serves HTTP2 when HTTPs is enabled, /// in order to change it, use `ServerFlags` that can be provided /// to acceptor service. -pub struct HttpServer +pub struct HttpServer where H: IntoHttpHandler + 'static, + F: Fn() -> Vec + Send + Clone, { - factory: Arc Vec + Send + Sync>, + factory: F, host: Option, keep_alive: KeepAlive, backlog: i32, @@ -54,21 +55,39 @@ where sockets: Vec>, } -impl HttpServer +impl HttpServer where H: IntoHttpHandler + 'static, + F: Fn() -> Vec + Send + Clone + 'static, { /// Create new http server with application factory - pub fn new(factory: F) -> Self + pub fn new(factory: F1) -> HttpServer Vec + Send + Clone> where - F: Fn() -> U + Sync + Send + 'static, + F1: Fn() -> U + Send + Clone, U: IntoIterator + 'static, { - let f = move || (factory)().into_iter().collect(); + let f = move || (factory.clone())().into_iter().collect(); HttpServer { threads: num_cpus::get(), - factory: Arc::new(f), + factory: f, + host: None, + backlog: 2048, + keep_alive: KeepAlive::Os, + shutdown_timeout: 30, + exit: false, + no_http2: false, + no_signals: false, + maxconn: 25_600, + maxconnrate: 256, + sockets: Vec::new(), + } + } + + pub(crate) fn with_factory(factory: F) -> HttpServer { + HttpServer { + factory, + threads: num_cpus::get(), host: None, backlog: 2048, keep_alive: KeepAlive::Timeout(5), @@ -211,6 +230,13 @@ where handler: Box::new(SimpleFactory { addr, factory: self.factory.clone(), + pipeline: DefaultPipelineFactory { + addr, + factory: self.factory.clone(), + host: self.host.clone(), + keep_alive: self.keep_alive, + _t: PhantomData, + }, }), }); @@ -219,22 +245,30 @@ where #[doc(hidden)] /// Use listener for accepting incoming connection requests - pub(crate) fn listen_with(mut self, lst: net::TcpListener, acceptor: F) -> Self + pub(crate) fn listen_with( + mut self, lst: net::TcpListener, acceptor: A, + ) -> Self where - F: Fn() -> T + Send + Clone + 'static, - T: NewService + Clone + 'static, - T::Response: IoStream, + A: AcceptorServiceFactory, + T: NewService + + Clone + + 'static, + Io: IoStream + Send, { let addr = lst.local_addr().unwrap(); self.sockets.push(Socket { lst, addr, scheme: "https", - handler: Box::new(AcceptorFactory { - addr, + handler: Box::new(HttpServiceBuilder::new( acceptor, - factory: self.factory.clone(), - }), + DefaultPipelineFactory::new( + self.factory.clone(), + self.host.clone(), + addr, + self.keep_alive, + ), + )), }); self @@ -256,7 +290,7 @@ where /// /// This method sets alpn protocols to "h2" and "http/1.1" pub fn listen_ssl( - self, lst: net::TcpListener, builder: SslAcceptorBuilder, + mut self, lst: net::TcpListener, builder: SslAcceptorBuilder, ) -> io::Result { use super::{openssl_acceptor_with_flags, ServerFlags}; @@ -268,9 +302,23 @@ where let acceptor = openssl_acceptor_with_flags(builder, flags)?; - Ok(self.listen_with(lst, move || { - ssl::OpensslAcceptor::new(acceptor.clone()).map_err(|_| ()) - })) + let addr = lst.local_addr().unwrap(); + self.sockets.push(Socket { + lst, + addr, + scheme: "https", + handler: Box::new(HttpServiceBuilder::new( + move || ssl::OpensslAcceptor::new(acceptor.clone()).map_err(|_| ()), + DefaultPipelineFactory::new( + self.factory.clone(), + self.host.clone(), + addr, + self.keep_alive, + ), + )), + }); + + Ok(self) } // #[cfg(feature = "rust-tls")] @@ -408,7 +456,7 @@ where // } } -impl HttpServer { +impl Vec + Send + Clone> HttpServer { /// Start listening for incoming connections. /// /// This method starts number of http workers in separate threads. @@ -552,35 +600,35 @@ impl HttpServer { // } // } -struct HttpService +struct HttpService where - H: HttpHandler, - F: IntoHttpHandler, + F: Fn() -> Vec, + H: IntoHttpHandler, Io: IoStream, { - factory: Arc Vec + Send + Sync>, + factory: F, addr: net::SocketAddr, host: Option, keep_alive: KeepAlive, - _t: PhantomData<(H, Io)>, + _t: PhantomData, } -impl NewService for HttpService +impl NewService for HttpService where - H: HttpHandler, - F: IntoHttpHandler, + F: Fn() -> Vec, + H: IntoHttpHandler, Io: IoStream, { type Request = Io; type Response = (); type Error = (); type InitError = (); - type Service = HttpServiceHandler; + type Service = HttpServiceHandler; type Future = FutureResult; fn new_service(&self) -> Self::Future { let s = ServerSettings::new(Some(self.addr), &self.host, false); - let apps: Vec<_> = (*self.factory)() + let apps: Vec<_> = (self.factory)() .into_iter() .map(|h| h.into_handler()) .collect(); @@ -658,94 +706,43 @@ where ) -> Server; } -struct SimpleFactory +struct SimpleFactory where H: IntoHttpHandler, + F: Fn() -> Vec + Send + Clone, + P: HttpPipelineFactory, { pub addr: net::SocketAddr, - pub factory: Arc Vec + Send + Sync>, + pub factory: F, + pub pipeline: P, } -impl Clone for SimpleFactory { +impl Clone for SimpleFactory +where + P: HttpPipelineFactory, + F: Fn() -> Vec + Send + Clone, +{ fn clone(&self) -> Self { SimpleFactory { addr: self.addr, factory: self.factory.clone(), + pipeline: self.pipeline.clone(), } } } -impl ServiceFactory for SimpleFactory +impl ServiceFactory for SimpleFactory where H: IntoHttpHandler + 'static, + F: Fn() -> Vec + Send + Clone + 'static, + P: HttpPipelineFactory, { fn register( - &self, server: Server, lst: net::TcpListener, host: Option, - keep_alive: KeepAlive, + &self, server: Server, lst: net::TcpListener, _host: Option, + _keep_alive: KeepAlive, ) -> Server { - let addr = self.addr; - let factory = self.factory.clone(); - - server.listen(lst, move || HttpService { - keep_alive, - addr, - host: host.clone(), - factory: factory.clone(), - _t: PhantomData, - }) - } -} - -struct AcceptorFactory -where - F: Fn() -> T + Send + Clone + 'static, - T: NewService, - H: IntoHttpHandler, -{ - pub addr: net::SocketAddr, - pub acceptor: F, - pub factory: Arc Vec + Send + Sync>, -} - -impl Clone for AcceptorFactory -where - F: Fn() -> T + Send + Clone + 'static, - T: NewService, - H: IntoHttpHandler, -{ - fn clone(&self) -> Self { - AcceptorFactory { - addr: self.addr, - acceptor: self.acceptor.clone(), - factory: self.factory.clone(), - } - } -} - -impl ServiceFactory for AcceptorFactory -where - F: Fn() -> T + Send + Clone + 'static, - H: IntoHttpHandler + 'static, - T: NewService + Clone + 'static, - T::Response: IoStream, -{ - fn register( - &self, server: Server, lst: net::TcpListener, host: Option, - keep_alive: KeepAlive, - ) -> Server { - let addr = self.addr; - let factory = self.factory.clone(); - let acceptor = self.acceptor.clone(); - - server.listen(lst, move || { - (acceptor)().and_then(HttpService { - keep_alive, - addr, - host: host.clone(), - factory: factory.clone(), - _t: PhantomData, - }) - }) + let pipeline = self.pipeline.clone(); + server.listen(lst, move || pipeline.create()) } } @@ -760,3 +757,186 @@ fn create_tcp_listener( builder.bind(addr)?; Ok(builder.listen(backlog)?) } + +pub struct HttpServiceBuilder { + acceptor: A, + pipeline: P, + t: PhantomData, +} + +impl HttpServiceBuilder +where + A: AcceptorServiceFactory, + P: HttpPipelineFactory, + H: IntoHttpHandler, +{ + pub fn new(acceptor: A, pipeline: P) -> Self { + Self { + acceptor, + pipeline, + t: PhantomData, + } + } + + pub fn acceptor(self, acceptor: A1) -> HttpServiceBuilder + where + A1: AcceptorServiceFactory, + { + HttpServiceBuilder { + acceptor, + pipeline: self.pipeline, + t: PhantomData, + } + } + + pub fn pipeline(self, pipeline: P1) -> HttpServiceBuilder + where + P1: HttpPipelineFactory, + { + HttpServiceBuilder { + pipeline, + acceptor: self.acceptor, + t: PhantomData, + } + } + + fn finish(&self) -> impl ServerServiceFactory { + let acceptor = self.acceptor.clone(); + let pipeline = self.pipeline.clone(); + + move || acceptor.create().and_then(pipeline.create()) + } +} + +impl ServiceFactory for HttpServiceBuilder +where + A: AcceptorServiceFactory, + P: HttpPipelineFactory, + H: IntoHttpHandler, +{ + fn register( + &self, server: Server, lst: net::TcpListener, _host: Option, + _keep_alive: KeepAlive, + ) -> Server { + server.listen(lst, self.finish()) + } +} + +pub trait AcceptorServiceFactory: Send + Clone + 'static { + type Io: IoStream + Send; + type NewService: NewService< + Request = TcpStream, + Response = Self::Io, + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +impl AcceptorServiceFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T::Response: IoStream + Send, + T: NewService, +{ + type Io = T::Response; + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} + +pub trait HttpPipelineFactory: Send + Clone + 'static { + type Io: IoStream; + type NewService: NewService< + Request = Self::Io, + Response = (), + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +impl HttpPipelineFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T: NewService, + T::Request: IoStream, +{ + type Io = T::Request; + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} + +struct DefaultPipelineFactory +where + F: Fn() -> Vec + Send + Clone, +{ + factory: F, + host: Option, + addr: net::SocketAddr, + keep_alive: KeepAlive, + _t: PhantomData, +} + +impl DefaultPipelineFactory +where + Io: IoStream + Send, + F: Fn() -> Vec + Send + Clone + 'static, + H: IntoHttpHandler + 'static, +{ + fn new( + factory: F, host: Option, addr: net::SocketAddr, keep_alive: KeepAlive, + ) -> Self { + Self { + factory, + addr, + keep_alive, + host, + _t: PhantomData, + } + } +} + +impl Clone for DefaultPipelineFactory +where + Io: IoStream, + F: Fn() -> Vec + Send + Clone, + H: IntoHttpHandler, +{ + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + addr: self.addr, + keep_alive: self.keep_alive, + host: self.host.clone(), + _t: PhantomData, + } + } +} + +impl HttpPipelineFactory for DefaultPipelineFactory +where + Io: IoStream + Send, + F: Fn() -> Vec + Send + Clone + 'static, + H: IntoHttpHandler + 'static, +{ + type Io = Io; + type NewService = HttpService; + + fn create(&self) -> Self::NewService { + HttpService { + addr: self.addr, + keep_alive: self.keep_alive, + host: self.host.clone(), + factory: self.factory.clone(), + _t: PhantomData, + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 111cc87a4..6ba033762 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -174,13 +174,13 @@ const HW_BUFFER_SIZE: usize = 32_768; /// sys.run(); /// } /// ``` -pub fn new(factory: F) -> HttpServer +pub fn new(factory: F) -> HttpServer Vec + Send + Clone> where - F: Fn() -> U + Sync + Send + 'static, - U: IntoIterator + 'static, + F: Fn() -> U + Send + Clone + 'static, + U: IntoIterator, H: IntoHttpHandler + 'static, { - HttpServer::new(factory) + HttpServer::with_factory(move || (factory.clone())().into_iter().collect()) } #[doc(hidden)] diff --git a/src/test.rs b/src/test.rs index c068086d5..c589ea4b0 100644 --- a/src/test.rs +++ b/src/test.rs @@ -79,13 +79,13 @@ impl TestServer { /// middlewares or set handlers for test application. pub fn new(config: F) -> Self where - F: Sync + Send + 'static + Fn(&mut TestApp<()>), + F: Clone + Send + 'static + Fn(&mut TestApp<()>), { TestServerBuilder::new(|| ()).start(config) } /// Create test server builder - pub fn build() -> TestServerBuilder<()> { + pub fn build() -> TestServerBuilder<(), impl Fn() -> () + Clone + Send + 'static> { TestServerBuilder::new(|| ()) } @@ -94,9 +94,9 @@ impl TestServer { /// This method can be used for constructing application state. /// Also it can be used for external dependency initialization, /// like creating sync actors for diesel integration. - pub fn build_with_state(state: F) -> TestServerBuilder + pub fn build_with_state(state: F) -> TestServerBuilder where - F: Fn() -> S + Sync + Send + 'static, + F: Fn() -> S + Clone + Send + 'static, S: 'static, { TestServerBuilder::new(state) @@ -105,11 +105,12 @@ impl TestServer { /// Start new test server with application factory pub fn with_factory(factory: F) -> Self where - F: Fn() -> U + Sync + Send + 'static, - U: IntoIterator + 'static, + F: Fn() -> U + Send + Clone + 'static, + U: IntoIterator, H: IntoHttpHandler + 'static, { let (tx, rx) = mpsc::channel(); + let factory = move || (factory.clone())().into_iter().collect(); // run server in separate thread thread::spawn(move || { @@ -117,7 +118,7 @@ impl TestServer { let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - HttpServer::new(factory) + let _ = HttpServer::with_factory(factory) .disable_signals() .listen(tcp) .keep_alive(5) @@ -261,22 +262,25 @@ impl Drop for TestServer { /// /// This type can be used to construct an instance of `TestServer` through a /// builder-like pattern. -pub struct TestServerBuilder { - state: Box S + Sync + Send + 'static>, +pub struct TestServerBuilder +where + F: Fn() -> S + Send + Clone + 'static, +{ + state: F, #[cfg(feature = "alpn")] ssl: Option, #[cfg(feature = "rust-tls")] rust_ssl: Option, } -impl TestServerBuilder { +impl TestServerBuilder +where + F: Fn() -> S + Send + Clone + 'static, +{ /// Create a new test server - pub fn new(state: F) -> TestServerBuilder - where - F: Fn() -> S + Sync + Send + 'static, - { + pub fn new(state: F) -> TestServerBuilder { TestServerBuilder { - state: Box::new(state), + state, #[cfg(feature = "alpn")] ssl: None, #[cfg(feature = "rust-tls")] @@ -300,9 +304,9 @@ impl TestServerBuilder { #[allow(unused_mut)] /// Configure test application and run test server - pub fn start(mut self, config: F) -> TestServer + pub fn start(mut self, config: C) -> TestServer where - F: Sync + Send + 'static + Fn(&mut TestApp), + C: Fn(&mut TestApp) + Clone + Send + 'static, { let (tx, rx) = mpsc::channel(); @@ -324,7 +328,7 @@ impl TestServerBuilder { let sys = System::new("actix-test-server"); let state = self.state; - let mut srv = HttpServer::new(move || { + let mut srv = HttpServer::with_factory(move || { let mut app = TestApp::new(state()); config(&mut app); vec![app]