1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

refactor acceptor service

This commit is contained in:
Nikolay Kim 2018-09-09 10:51:30 -07:00
parent 6a61138bf8
commit a3cfc24232
3 changed files with 307 additions and 123 deletions

View File

@ -1,9 +1,9 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::{io, mem, net, time};
use actix::{Actor, Addr, AsyncContext, Context, Handler, System};
use actix_net::{ssl, NewService, NewServiceExt, Server, Service};
use actix_net::server::{Server, ServerServiceFactory};
use actix_net::{ssl, NewService, NewServiceExt, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Poll, Stream};
@ -36,11 +36,12 @@ struct Socket<H: IntoHttpHandler> {
/// By default it serves HTTP2 when HTTPs is enabled,
/// in order to change it, use `ServerFlags` that can be provided
/// to acceptor service.
pub struct HttpServer<H>
pub struct HttpServer<H, F>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone,
{
factory: Arc<Fn() -> Vec<H> + Send + Sync>,
factory: F,
host: Option<String>,
keep_alive: KeepAlive,
backlog: i32,
@ -54,21 +55,39 @@ where
sockets: Vec<Socket<H>>,
}
impl<H> HttpServer<H>
impl<H, F> HttpServer<H, F>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone + 'static,
{
/// Create new http server with application factory
pub fn new<F, U>(factory: F) -> Self
pub fn new<F1, U>(factory: F1) -> HttpServer<H, impl Fn() -> Vec<H> + Send + Clone>
where
F: Fn() -> U + Sync + Send + 'static,
F1: Fn() -> U + Send + Clone,
U: IntoIterator<Item = H> + 'static,
{
let f = move || (factory)().into_iter().collect();
let f = move || (factory.clone())().into_iter().collect();
HttpServer {
threads: num_cpus::get(),
factory: Arc::new(f),
factory: f,
host: None,
backlog: 2048,
keep_alive: KeepAlive::Os,
shutdown_timeout: 30,
exit: false,
no_http2: false,
no_signals: false,
maxconn: 25_600,
maxconnrate: 256,
sockets: Vec::new(),
}
}
pub(crate) fn with_factory(factory: F) -> HttpServer<H, F> {
HttpServer {
factory,
threads: num_cpus::get(),
host: None,
backlog: 2048,
keep_alive: KeepAlive::Timeout(5),
@ -211,6 +230,13 @@ where
handler: Box::new(SimpleFactory {
addr,
factory: self.factory.clone(),
pipeline: DefaultPipelineFactory {
addr,
factory: self.factory.clone(),
host: self.host.clone(),
keep_alive: self.keep_alive,
_t: PhantomData,
},
}),
});
@ -219,22 +245,30 @@ where
#[doc(hidden)]
/// Use listener for accepting incoming connection requests
pub(crate) fn listen_with<T, F>(mut self, lst: net::TcpListener, acceptor: F) -> Self
pub(crate) fn listen_with<T, A, Io>(
mut self, lst: net::TcpListener, acceptor: A,
) -> Self
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Request = TcpStream, Error = (), InitError = ()> + Clone + 'static,
T::Response: IoStream,
A: AcceptorServiceFactory<Io = Io>,
T: NewService<Request = TcpStream, Response = Io, Error = (), InitError = ()>
+ Clone
+ 'static,
Io: IoStream + Send,
{
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
lst,
addr,
scheme: "https",
handler: Box::new(AcceptorFactory {
addr,
handler: Box::new(HttpServiceBuilder::new(
acceptor,
factory: self.factory.clone(),
}),
DefaultPipelineFactory::new(
self.factory.clone(),
self.host.clone(),
addr,
self.keep_alive,
),
)),
});
self
@ -256,7 +290,7 @@ where
///
/// This method sets alpn protocols to "h2" and "http/1.1"
pub fn listen_ssl(
self, lst: net::TcpListener, builder: SslAcceptorBuilder,
mut self, lst: net::TcpListener, builder: SslAcceptorBuilder,
) -> io::Result<Self> {
use super::{openssl_acceptor_with_flags, ServerFlags};
@ -268,9 +302,23 @@ where
let acceptor = openssl_acceptor_with_flags(builder, flags)?;
Ok(self.listen_with(lst, move || {
ssl::OpensslAcceptor::new(acceptor.clone()).map_err(|_| ())
}))
let addr = lst.local_addr().unwrap();
self.sockets.push(Socket {
lst,
addr,
scheme: "https",
handler: Box::new(HttpServiceBuilder::new(
move || ssl::OpensslAcceptor::new(acceptor.clone()).map_err(|_| ()),
DefaultPipelineFactory::new(
self.factory.clone(),
self.host.clone(),
addr,
self.keep_alive,
),
)),
});
Ok(self)
}
// #[cfg(feature = "rust-tls")]
@ -408,7 +456,7 @@ where
// }
}
impl<H: IntoHttpHandler> HttpServer<H> {
impl<H: IntoHttpHandler, F: Fn() -> Vec<H> + Send + Clone> HttpServer<H, F> {
/// Start listening for incoming connections.
///
/// This method starts number of http workers in separate threads.
@ -552,35 +600,35 @@ impl<H: IntoHttpHandler> HttpServer<H> {
// }
// }
struct HttpService<H, F, Io>
struct HttpService<F, H, Io>
where
H: HttpHandler,
F: IntoHttpHandler<Handler = H>,
F: Fn() -> Vec<H>,
H: IntoHttpHandler,
Io: IoStream,
{
factory: Arc<Fn() -> Vec<F> + Send + Sync>,
factory: F,
addr: net::SocketAddr,
host: Option<String>,
keep_alive: KeepAlive,
_t: PhantomData<(H, Io)>,
_t: PhantomData<Io>,
}
impl<H, F, Io> NewService for HttpService<H, F, Io>
impl<F, H, Io> NewService for HttpService<F, H, Io>
where
H: HttpHandler,
F: IntoHttpHandler<Handler = H>,
F: Fn() -> Vec<H>,
H: IntoHttpHandler,
Io: IoStream,
{
type Request = Io;
type Response = ();
type Error = ();
type InitError = ();
type Service = HttpServiceHandler<H, Io>;
type Service = HttpServiceHandler<H::Handler, Io>;
type Future = FutureResult<Self::Service, Self::Error>;
fn new_service(&self) -> Self::Future {
let s = ServerSettings::new(Some(self.addr), &self.host, false);
let apps: Vec<_> = (*self.factory)()
let apps: Vec<_> = (self.factory)()
.into_iter()
.map(|h| h.into_handler())
.collect();
@ -658,94 +706,43 @@ where
) -> Server;
}
struct SimpleFactory<H>
struct SimpleFactory<H, F, P>
where
H: IntoHttpHandler,
F: Fn() -> Vec<H> + Send + Clone,
P: HttpPipelineFactory<Io = TcpStream>,
{
pub addr: net::SocketAddr,
pub factory: Arc<Fn() -> Vec<H> + Send + Sync>,
pub factory: F,
pub pipeline: P,
}
impl<H: IntoHttpHandler> Clone for SimpleFactory<H> {
impl<H: IntoHttpHandler, F, P> Clone for SimpleFactory<H, F, P>
where
P: HttpPipelineFactory<Io = TcpStream>,
F: Fn() -> Vec<H> + Send + Clone,
{
fn clone(&self) -> Self {
SimpleFactory {
addr: self.addr,
factory: self.factory.clone(),
pipeline: self.pipeline.clone(),
}
}
}
impl<H> ServiceFactory<H> for SimpleFactory<H>
impl<H, F, P> ServiceFactory<H> for SimpleFactory<H, F, P>
where
H: IntoHttpHandler + 'static,
F: Fn() -> Vec<H> + Send + Clone + 'static,
P: HttpPipelineFactory<Io = TcpStream>,
{
fn register(
&self, server: Server, lst: net::TcpListener, host: Option<String>,
keep_alive: KeepAlive,
&self, server: Server, lst: net::TcpListener, _host: Option<String>,
_keep_alive: KeepAlive,
) -> Server {
let addr = self.addr;
let factory = self.factory.clone();
server.listen(lst, move || HttpService {
keep_alive,
addr,
host: host.clone(),
factory: factory.clone(),
_t: PhantomData,
})
}
}
struct AcceptorFactory<T, F, H>
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService,
H: IntoHttpHandler,
{
pub addr: net::SocketAddr,
pub acceptor: F,
pub factory: Arc<Fn() -> Vec<H> + Send + Sync>,
}
impl<T, F, H> Clone for AcceptorFactory<T, F, H>
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService,
H: IntoHttpHandler,
{
fn clone(&self) -> Self {
AcceptorFactory {
addr: self.addr,
acceptor: self.acceptor.clone(),
factory: self.factory.clone(),
}
}
}
impl<T, F, H> ServiceFactory<H> for AcceptorFactory<T, F, H>
where
F: Fn() -> T + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
T: NewService<Request = TcpStream, Error = (), InitError = ()> + Clone + 'static,
T::Response: IoStream,
{
fn register(
&self, server: Server, lst: net::TcpListener, host: Option<String>,
keep_alive: KeepAlive,
) -> Server {
let addr = self.addr;
let factory = self.factory.clone();
let acceptor = self.acceptor.clone();
server.listen(lst, move || {
(acceptor)().and_then(HttpService {
keep_alive,
addr,
host: host.clone(),
factory: factory.clone(),
_t: PhantomData,
})
})
let pipeline = self.pipeline.clone();
server.listen(lst, move || pipeline.create())
}
}
@ -760,3 +757,186 @@ fn create_tcp_listener(
builder.bind(addr)?;
Ok(builder.listen(backlog)?)
}
pub struct HttpServiceBuilder<H, A, P> {
acceptor: A,
pipeline: P,
t: PhantomData<H>,
}
impl<H, A, P> HttpServiceBuilder<H, A, P>
where
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
H: IntoHttpHandler,
{
pub fn new(acceptor: A, pipeline: P) -> Self {
Self {
acceptor,
pipeline,
t: PhantomData,
}
}
pub fn acceptor<A1>(self, acceptor: A1) -> HttpServiceBuilder<H, A1, P>
where
A1: AcceptorServiceFactory,
{
HttpServiceBuilder {
acceptor,
pipeline: self.pipeline,
t: PhantomData,
}
}
pub fn pipeline<P1>(self, pipeline: P1) -> HttpServiceBuilder<H, A, P1>
where
P1: HttpPipelineFactory,
{
HttpServiceBuilder {
pipeline,
acceptor: self.acceptor,
t: PhantomData,
}
}
fn finish(&self) -> impl ServerServiceFactory {
let acceptor = self.acceptor.clone();
let pipeline = self.pipeline.clone();
move || acceptor.create().and_then(pipeline.create())
}
}
impl<H, A, P> ServiceFactory<H> for HttpServiceBuilder<H, A, P>
where
A: AcceptorServiceFactory,
P: HttpPipelineFactory<Io = A::Io>,
H: IntoHttpHandler,
{
fn register(
&self, server: Server, lst: net::TcpListener, _host: Option<String>,
_keep_alive: KeepAlive,
) -> Server {
server.listen(lst, self.finish())
}
}
pub trait AcceptorServiceFactory: Send + Clone + 'static {
type Io: IoStream + Send;
type NewService: NewService<
Request = TcpStream,
Response = Self::Io,
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> AcceptorServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T::Response: IoStream + Send,
T: NewService<Request = TcpStream, Error = (), InitError = ()>,
{
type Io = T::Response;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
pub trait HttpPipelineFactory: Send + Clone + 'static {
type Io: IoStream;
type NewService: NewService<
Request = Self::Io,
Response = (),
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
impl<F, T> HttpPipelineFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Response = (), Error = (), InitError = ()>,
T::Request: IoStream,
{
type Io = T::Request;
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
struct DefaultPipelineFactory<F, H, Io>
where
F: Fn() -> Vec<H> + Send + Clone,
{
factory: F,
host: Option<String>,
addr: net::SocketAddr,
keep_alive: KeepAlive,
_t: PhantomData<Io>,
}
impl<F, H, Io> DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> Vec<H> + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
fn new(
factory: F, host: Option<String>, addr: net::SocketAddr, keep_alive: KeepAlive,
) -> Self {
Self {
factory,
addr,
keep_alive,
host,
_t: PhantomData,
}
}
}
impl<F, H, Io> Clone for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream,
F: Fn() -> Vec<H> + Send + Clone,
H: IntoHttpHandler,
{
fn clone(&self) -> Self {
Self {
factory: self.factory.clone(),
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
_t: PhantomData,
}
}
}
impl<F, H, Io> HttpPipelineFactory for DefaultPipelineFactory<F, H, Io>
where
Io: IoStream + Send,
F: Fn() -> Vec<H> + Send + Clone + 'static,
H: IntoHttpHandler + 'static,
{
type Io = Io;
type NewService = HttpService<F, H, Io>;
fn create(&self) -> Self::NewService {
HttpService {
addr: self.addr,
keep_alive: self.keep_alive,
host: self.host.clone(),
factory: self.factory.clone(),
_t: PhantomData,
}
}
}

View File

@ -174,13 +174,13 @@ const HW_BUFFER_SIZE: usize = 32_768;
/// sys.run();
/// }
/// ```
pub fn new<F, U, H>(factory: F) -> HttpServer<H>
pub fn new<F, U, H>(factory: F) -> HttpServer<H, impl Fn() -> Vec<H> + Send + Clone>
where
F: Fn() -> U + Sync + Send + 'static,
U: IntoIterator<Item = H> + 'static,
F: Fn() -> U + Send + Clone + 'static,
U: IntoIterator<Item = H>,
H: IntoHttpHandler + 'static,
{
HttpServer::new(factory)
HttpServer::with_factory(move || (factory.clone())().into_iter().collect())
}
#[doc(hidden)]

View File

@ -79,13 +79,13 @@ impl TestServer {
/// middlewares or set handlers for test application.
pub fn new<F>(config: F) -> Self
where
F: Sync + Send + 'static + Fn(&mut TestApp<()>),
F: Clone + Send + 'static + Fn(&mut TestApp<()>),
{
TestServerBuilder::new(|| ()).start(config)
}
/// Create test server builder
pub fn build() -> TestServerBuilder<()> {
pub fn build() -> TestServerBuilder<(), impl Fn() -> () + Clone + Send + 'static> {
TestServerBuilder::new(|| ())
}
@ -94,9 +94,9 @@ impl TestServer {
/// This method can be used for constructing application state.
/// Also it can be used for external dependency initialization,
/// like creating sync actors for diesel integration.
pub fn build_with_state<F, S>(state: F) -> TestServerBuilder<S>
pub fn build_with_state<S, F>(state: F) -> TestServerBuilder<S, F>
where
F: Fn() -> S + Sync + Send + 'static,
F: Fn() -> S + Clone + Send + 'static,
S: 'static,
{
TestServerBuilder::new(state)
@ -105,11 +105,12 @@ impl TestServer {
/// Start new test server with application factory
pub fn with_factory<F, U, H>(factory: F) -> Self
where
F: Fn() -> U + Sync + Send + 'static,
U: IntoIterator<Item = H> + 'static,
F: Fn() -> U + Send + Clone + 'static,
U: IntoIterator<Item = H>,
H: IntoHttpHandler + 'static,
{
let (tx, rx) = mpsc::channel();
let factory = move || (factory.clone())().into_iter().collect();
// run server in separate thread
thread::spawn(move || {
@ -117,7 +118,7 @@ impl TestServer {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
HttpServer::new(factory)
let _ = HttpServer::with_factory(factory)
.disable_signals()
.listen(tcp)
.keep_alive(5)
@ -261,22 +262,25 @@ impl Drop for TestServer {
///
/// This type can be used to construct an instance of `TestServer` through a
/// builder-like pattern.
pub struct TestServerBuilder<S> {
state: Box<Fn() -> S + Sync + Send + 'static>,
pub struct TestServerBuilder<S, F>
where
F: Fn() -> S + Send + Clone + 'static,
{
state: F,
#[cfg(feature = "alpn")]
ssl: Option<SslAcceptorBuilder>,
#[cfg(feature = "rust-tls")]
rust_ssl: Option<ServerConfig>,
}
impl<S: 'static> TestServerBuilder<S> {
impl<S: 'static, F> TestServerBuilder<S, F>
where
F: Fn() -> S + Send + Clone + 'static,
{
/// Create a new test server
pub fn new<F>(state: F) -> TestServerBuilder<S>
where
F: Fn() -> S + Sync + Send + 'static,
{
pub fn new(state: F) -> TestServerBuilder<S, F> {
TestServerBuilder {
state: Box::new(state),
state,
#[cfg(feature = "alpn")]
ssl: None,
#[cfg(feature = "rust-tls")]
@ -300,9 +304,9 @@ impl<S: 'static> TestServerBuilder<S> {
#[allow(unused_mut)]
/// Configure test application and run test server
pub fn start<F>(mut self, config: F) -> TestServer
pub fn start<C>(mut self, config: C) -> TestServer
where
F: Sync + Send + 'static + Fn(&mut TestApp<S>),
C: Fn(&mut TestApp<S>) + Clone + Send + 'static,
{
let (tx, rx) = mpsc::channel();
@ -324,7 +328,7 @@ impl<S: 'static> TestServerBuilder<S> {
let sys = System::new("actix-test-server");
let state = self.state;
let mut srv = HttpServer::new(move || {
let mut srv = HttpServer::with_factory(move || {
let mut app = TestApp::new(state());
config(&mut app);
vec![app]