1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 01:11:07 +01:00

add new service configuration

This commit is contained in:
Nikolay Kim 2018-08-21 17:08:23 -07:00
parent 2cbcc21168
commit b8c8dbc90a
9 changed files with 247 additions and 107 deletions

View File

@ -20,8 +20,7 @@ use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::SslAcceptorExt; use tokio_openssl::SslAcceptorExt;
use actix_net::service::{IntoNewService, NewServiceExt}; use actix_net::{IntoNewService, NewService, Server};
use actix_net::Server;
/// Simple logger service, it just prints fact of the new connections /// Simple logger service, it just prints fact of the new connections
fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>( fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>(
@ -90,7 +89,7 @@ fn main() {
Ok::<_, io::Error>(ServiceState { num: num.clone() }) Ok::<_, io::Error>(ServiceState { num: num.clone() })
})); }));
Server::new().bind("0.0.0.0:8443", srv).unwrap().start(); Server::default().bind("0.0.0.0:8443", srv).unwrap().start();
sys.run(); sys.run();
} }

View File

@ -15,8 +15,7 @@ use futures::{future, Future};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use actix_net::service::NewServiceExt; use actix_net::{ssl, NewService, Server};
use actix_net::{ssl, Server};
#[derive(Debug)] #[derive(Debug)]
struct ServiceState { struct ServiceState {
@ -24,7 +23,7 @@ struct ServiceState {
} }
fn service<T: AsyncRead + AsyncWrite>( fn service<T: AsyncRead + AsyncWrite>(
st: &mut ServiceState, stream: T, st: &mut ServiceState, _: T,
) -> impl Future<Item = (), Error = io::Error> { ) -> impl Future<Item = (), Error = io::Error> {
let num = st.num.fetch_add(1, Ordering::Relaxed); let num = st.num.fetch_add(1, Ordering::Relaxed);
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
@ -50,7 +49,7 @@ fn main() {
Ok::<_, io::Error>(ServiceState { num: num.clone() }) Ok::<_, io::Error>(ServiceState { num: num.clone() })
})); }));
Server::new().bind("0.0.0.0:8443", srv).unwrap().start(); Server::default().bind("0.0.0.0:8443", srv).unwrap().start();
sys.run(); sys.run();
} }

View File

@ -65,16 +65,20 @@ use bytes::{BufMut, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
pub use tower_service::Service;
pub(crate) mod accept; pub(crate) mod accept;
mod extensions; mod extensions;
mod server; mod server;
pub mod server_config;
mod server_service; mod server_service;
pub mod service; pub mod service;
pub mod ssl; pub mod ssl;
mod worker; mod worker;
pub use self::server::{ConnectionRateTag, ConnectionTag, Connections, Server}; pub use self::server::{ConnectionRateTag, ConnectionTag, Connections, Server};
pub use service::{IntoNewService, IntoService}; pub use server_config::Config;
pub use service::{IntoNewService, IntoService, NewService};
pub use extensions::Extensions; pub use extensions::Extensions;

View File

@ -10,7 +10,6 @@ use futures::{Future, Sink, Stream};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use tower_service::NewService;
use actix::{ use actix::{
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
@ -18,8 +17,9 @@ use actix::{
}; };
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::server_config::{Config, ServerConfig};
use super::server_service::{ServerNewService, ServerServiceFactory}; use super::server_service::{ServerNewService, ServerServiceFactory};
use super::service::IntoNewService; use super::service::{IntoNewService, NewService};
use super::worker::{Conn, StopWorker, Worker, WorkerClient}; use super::worker::{Conn, StopWorker, Worker, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::{PauseServer, ResumeServer, StopServer, Token};
@ -28,10 +28,11 @@ pub(crate) enum ServerCommand {
} }
/// Server /// Server
pub struct Server { pub struct Server<C = ServerConfig> {
config: C,
threads: usize, threads: usize,
workers: Vec<(usize, Addr<Worker>)>, workers: Vec<(usize, Addr<Worker>)>,
services: Vec<Box<ServerServiceFactory + Send>>, services: Vec<Box<ServerServiceFactory<C> + Send>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
@ -42,16 +43,17 @@ pub struct Server {
maxconnrate: usize, maxconnrate: usize,
} }
impl Default for Server { impl Default for Server<ServerConfig> {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new(ServerConfig::default())
} }
} }
impl Server { impl<C: Config> Server<C> {
/// Create new Server instance /// Create new Server instance
pub fn new() -> Server { pub fn new(config: C) -> Server<C> {
Server { Server {
config,
threads: num_cpus::get(), threads: num_cpus::get(),
workers: Vec::new(), workers: Vec::new(),
services: Vec::new(), services: Vec::new(),
@ -81,7 +83,10 @@ impl Server {
/// reached for each worker. /// reached for each worker.
/// ///
/// By default max connections is set to a 100k. /// By default max connections is set to a 100k.
pub fn maxconn(mut self, num: usize) -> Self { pub fn maxconn(mut self, num: usize) -> Self
where
C: AsMut<Connections>,
{
self.maxconn = num; self.maxconn = num;
self self
} }
@ -135,7 +140,7 @@ impl Server {
where where
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
T: IntoNewService<N> + Clone, T: IntoNewService<N> + Clone,
N: NewService<Request = TcpStream, Response = (), InitError = io::Error> N: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
+ Clone + Clone
+ Send + Send
+ 'static, + 'static,
@ -155,7 +160,7 @@ impl Server {
pub fn listen<T, N>(mut self, lst: net::TcpListener, srv: T) -> Self pub fn listen<T, N>(mut self, lst: net::TcpListener, srv: T) -> Self
where where
T: IntoNewService<N>, T: IntoNewService<N>,
N: NewService<Request = TcpStream, Response = (), InitError = io::Error> N: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
+ Clone + Clone
+ Send + Send
+ 'static, + 'static,
@ -164,8 +169,10 @@ impl Server {
N::Error: fmt::Display, N::Error: fmt::Display,
{ {
let token = Token(self.services.len()); let token = Token(self.services.len());
self.services self.services.push(ServerNewService::create(
.push(ServerNewService::create(srv.into_new_service())); srv.into_new_service(),
self.config.clone(),
));
self.sockets.push((token, lst)); self.sockets.push((token, lst));
self self
} }
@ -199,7 +206,7 @@ impl Server {
} }
/// Starts Server Actor and returns its address /// Starts Server Actor and returns its address
pub fn start(mut self) -> Addr<Server> { pub fn start(mut self) -> Addr<Server<C>> {
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("Service should have at least one bound socket"); panic!("Service should have at least one bound socket");
} else { } else {
@ -251,7 +258,7 @@ impl Server {
let (tx, rx) = unbounded::<Conn>(); let (tx, rx) = unbounded::<Conn>();
let conns = Connections::new(notify, self.maxconn, self.maxconnrate); let conns = Connections::new(notify, self.maxconn, self.maxconnrate);
let worker = WorkerClient::new(idx, tx, conns.clone()); let worker = WorkerClient::new(idx, tx, conns.clone());
let services: Vec<Box<ServerServiceFactory + Send>> = let services: Vec<Box<ServerServiceFactory<C> + Send>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| { let addr = Arbiter::start(move |ctx: &mut Context<_>| {
@ -263,14 +270,14 @@ impl Server {
} }
} }
impl Actor for Server { impl<C: Config> Actor for Server<C> {
type Context = Context<Self>; type Context = Context<Self>;
} }
/// Signals support /// Signals support
/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
/// message to `System` actor. /// message to `System` actor.
impl Handler<signal::Signal> for Server { impl<C: Config> Handler<signal::Signal> for Server<C> {
type Result = (); type Result = ();
fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) { fn handle(&mut self, msg: signal::Signal, ctx: &mut Context<Self>) {
@ -295,7 +302,7 @@ impl Handler<signal::Signal> for Server {
} }
} }
impl Handler<PauseServer> for Server { impl<C: Config> Handler<PauseServer> for Server<C> {
type Result = (); type Result = ();
fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) { fn handle(&mut self, _: PauseServer, _: &mut Context<Self>) {
@ -303,7 +310,7 @@ impl Handler<PauseServer> for Server {
} }
} }
impl Handler<ResumeServer> for Server { impl<C: Config> Handler<ResumeServer> for Server<C> {
type Result = (); type Result = ();
fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) { fn handle(&mut self, _: ResumeServer, _: &mut Context<Self>) {
@ -311,7 +318,7 @@ impl Handler<ResumeServer> for Server {
} }
} }
impl Handler<StopServer> for Server { impl<C: Config> Handler<StopServer> for Server<C> {
type Result = Response<(), ()>; type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
@ -366,7 +373,7 @@ impl Handler<StopServer> for Server {
} }
/// Commands from accept threads /// Commands from accept threads
impl StreamHandler<ServerCommand, ()> for Server { impl<C: Config> StreamHandler<ServerCommand, ()> for Server<C> {
fn finished(&mut self, _: &mut Context<Self>) {} fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) { fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {

81
src/server_config.rs Normal file
View File

@ -0,0 +1,81 @@
//! Default server config
use std::sync::{atomic::AtomicUsize, Arc};
pub trait Config: Send + Clone + Default + 'static {
fn fork(&self) -> Self;
}
#[derive(Clone, Default)]
pub struct ServerConfig {
conn: ConnectionsConfig,
ssl: SslConfig,
}
impl Config for ServerConfig {
fn fork(&self) -> Self {
ServerConfig {
conn: self.conn.fork(),
ssl: self.ssl.fork(),
}
}
}
impl AsRef<ConnectionsConfig> for ServerConfig {
fn as_ref(&self) -> &ConnectionsConfig {
&self.conn
}
}
impl AsRef<SslConfig> for ServerConfig {
fn as_ref(&self) -> &SslConfig {
&self.ssl
}
}
#[derive(Clone)]
pub struct ConnectionsConfig {
max_connections: usize,
num_connections: Arc<AtomicUsize>,
}
impl Default for ConnectionsConfig {
fn default() -> Self {
ConnectionsConfig {
max_connections: 102_400,
num_connections: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Config for ConnectionsConfig {
fn fork(&self) -> Self {
ConnectionsConfig {
max_connections: self.max_connections,
num_connections: Arc::new(AtomicUsize::new(0)),
}
}
}
#[derive(Clone)]
pub struct SslConfig {
max_handshakes: usize,
num: Arc<AtomicUsize>,
}
impl Default for SslConfig {
fn default() -> Self {
SslConfig {
max_handshakes: 256,
num: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Config for SslConfig {
fn fork(&self) -> Self {
SslConfig {
max_handshakes: self.max_handshakes,
num: Arc::new(AtomicUsize::new(0)),
}
}
}

View File

@ -7,7 +7,8 @@ use std::{fmt, io, net};
use futures::{future, Future, Poll}; use futures::{future, Future, Poll};
use tokio_reactor::Handle; use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use tower_service::{NewService, Service};
use super::{Config, NewService, Service};
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
Service< Service<
@ -55,14 +56,15 @@ where
} }
} }
pub(crate) struct ServerNewService<T> { pub(crate) struct ServerNewService<T, C> {
inner: T, inner: T,
config: C,
counter: Arc<AtomicUsize>, counter: Arc<AtomicUsize>,
} }
impl<T> ServerNewService<T> impl<T, C: Config> ServerNewService<T, C>
where where
T: NewService<Request = TcpStream, Response = (), InitError = io::Error> T: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
+ Clone + Clone
+ Send + Send
+ 'static, + 'static,
@ -70,25 +72,26 @@ where
T::Future: 'static, T::Future: 'static,
T::Error: fmt::Display, T::Error: fmt::Display,
{ {
pub(crate) fn create(inner: T) -> Box<ServerServiceFactory + Send> { pub(crate) fn create(inner: T, config: C) -> Box<ServerServiceFactory<C> + Send> {
Box::new(Self { Box::new(Self {
inner, inner,
config,
counter: Arc::new(AtomicUsize::new(0)), counter: Arc::new(AtomicUsize::new(0)),
}) })
} }
} }
pub trait ServerServiceFactory { pub trait ServerServiceFactory<C> {
fn counter(&self) -> Arc<AtomicUsize>; fn counter(&self) -> Arc<AtomicUsize>;
fn clone_factory(&self) -> Box<ServerServiceFactory + Send>; fn clone_factory(&self) -> Box<ServerServiceFactory<C> + Send>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>; fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
} }
impl<T> ServerServiceFactory for ServerNewService<T> impl<T, C: Config> ServerServiceFactory<C> for ServerNewService<T, C>
where where
T: NewService<Request = TcpStream, Response = (), InitError = io::Error> T: NewService<Request = TcpStream, Response = (), Config = C, InitError = io::Error>
+ Clone + Clone
+ Send + Send
+ 'static, + 'static,
@ -100,28 +103,35 @@ where
self.counter.clone() self.counter.clone()
} }
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory<C> + Send> {
Box::new(Self { Box::new(Self {
inner: self.inner.clone(), inner: self.inner.clone(),
config: self.config.fork(),
counter: Arc::new(AtomicUsize::new(0)), counter: Arc::new(AtomicUsize::new(0)),
}) })
} }
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> { fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
let counter = self.counter.clone(); let counter = self.counter.clone();
Box::new(self.inner.new_service().map_err(|_| ()).map(move |inner| { Box::new(
let service: BoxedServerService = Box::new(ServerService { inner, counter }); self.inner
.new_service(self.config.clone())
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService =
Box::new(ServerService { inner, counter });
service service
})) }),
)
} }
} }
impl ServerServiceFactory for Box<ServerServiceFactory> { impl<C> ServerServiceFactory<C> for Box<ServerServiceFactory<C>> {
fn counter(&self) -> Arc<AtomicUsize> { fn counter(&self) -> Arc<AtomicUsize> {
self.as_ref().counter() self.as_ref().counter()
} }
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory<C> + Send> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }

View File

@ -3,40 +3,52 @@ use std::marker;
use std::rc::Rc; use std::rc::Rc;
use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll}; use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll};
use tower_service::{NewService, Service}; use tower_service::Service;
pub trait NewServiceExt: NewService { /// Creates new `Service` values.
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B> ///
where /// Acts as a service factory. This is useful for cases where new `Service`
Self: Sized, /// values must be produced. One case is a TCP servier listener. The listner
F: IntoNewService<B>, /// accepts new TCP streams, obtains a new `Service` value using the
B: NewService< /// `NewService` trait, and uses that new `Service` value to process inbound
Request = Self::Response, /// requests on that new TCP stream.
pub trait NewService {
/// Requests handled by the service
type Request;
/// Responses given by the service
type Response;
/// Errors produced by the service
type Error;
/// The `Service` value created by this factory
type Service: Service<
Request = Self::Request,
Response = Self::Response,
Error = Self::Error, Error = Self::Error,
InitError = Self::InitError,
>; >;
fn map_err<F, E>(self, f: F) -> MapErrNewService<Self, F, E> /// Pipeline configuration
where type Config: Clone;
Self: Sized,
F: Fn(Self::Error) -> E;
fn map_init_err<F, E>(self, f: F) -> MapInitErr<Self, F, E> /// Errors produced while building a service.
where type InitError;
Self: Sized,
F: Fn(Self::InitError) -> E; /// The future of the `Service` instance.
} type Future: Future<Item = Self::Service, Error = Self::InitError>;
/// Create and return a new service value asynchronously.
fn new_service(&self, Self::Config) -> Self::Future;
impl<T> NewServiceExt for T
where
T: NewService,
{
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B> fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B>
where where
Self: Sized,
F: IntoNewService<B>, F: IntoNewService<B>,
B: NewService< B: NewService<
Request = Self::Response, Request = Self::Response,
Error = Self::Error, Error = Self::Error,
Config = Self::Config,
InitError = Self::InitError, InitError = Self::InitError,
>, >,
{ {
@ -45,6 +57,7 @@ where
fn map_err<F, E>(self, f: F) -> MapErrNewService<Self, F, E> fn map_err<F, E>(self, f: F) -> MapErrNewService<Self, F, E>
where where
Self: Sized,
F: Fn(Self::Error) -> E, F: Fn(Self::Error) -> E,
{ {
MapErrNewService::new(self, f) MapErrNewService::new(self, f)
@ -150,7 +163,7 @@ where
} }
} }
pub struct FnNewService<F, Req, Resp, Err, Fut> pub struct FnNewService<F, Req, Resp, Err, Fut, Cfg>
where where
F: Fn(Req) -> Fut, F: Fn(Req) -> Fut,
Fut: IntoFuture<Item = Resp, Error = Err>, Fut: IntoFuture<Item = Resp, Error = Err>,
@ -159,9 +172,10 @@ where
req: marker::PhantomData<Req>, req: marker::PhantomData<Req>,
resp: marker::PhantomData<Resp>, resp: marker::PhantomData<Resp>,
err: marker::PhantomData<Err>, err: marker::PhantomData<Err>,
cfg: marker::PhantomData<Cfg>,
} }
impl<F, Req, Resp, Err, Fut> FnNewService<F, Req, Resp, Err, Fut> impl<F, Req, Resp, Err, Fut, Cfg> FnNewService<F, Req, Resp, Err, Fut, Cfg>
where where
F: Fn(Req) -> Fut + Clone, F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>, Fut: IntoFuture<Item = Resp, Error = Err>,
@ -172,38 +186,43 @@ where
req: marker::PhantomData, req: marker::PhantomData,
resp: marker::PhantomData, resp: marker::PhantomData,
err: marker::PhantomData, err: marker::PhantomData,
cfg: marker::PhantomData,
} }
} }
} }
impl<F, Req, Resp, Err, Fut> NewService for FnNewService<F, Req, Resp, Err, Fut> impl<F, Req, Resp, Err, Fut, Cfg> NewService for FnNewService<F, Req, Resp, Err, Fut, Cfg>
where where
F: Fn(Req) -> Fut + Clone, F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>, Fut: IntoFuture<Item = Resp, Error = Err>,
Cfg: Clone,
{ {
type Request = Req; type Request = Req;
type Response = Resp; type Response = Resp;
type Error = Err; type Error = Err;
type Service = FnService<F, Req, Resp, Err, Fut>; type Service = FnService<F, Req, Resp, Err, Fut>;
type Config = Cfg;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, ()>; type Future = FutureResult<Self::Service, ()>;
fn new_service(&self) -> Self::Future { fn new_service(&self, cfg: Cfg) -> Self::Future {
future::ok(FnService::new(self.f.clone())) future::ok(FnService::new(self.f.clone()))
} }
} }
impl<F, Req, Resp, Err, Fut> IntoNewService<FnNewService<F, Req, Resp, Err, Fut>> for F impl<F, Req, Resp, Err, Fut, Cfg> IntoNewService<FnNewService<F, Req, Resp, Err, Fut, Cfg>>
for F
where where
F: Fn(Req) -> Fut + Clone + 'static, F: Fn(Req) -> Fut + Clone + 'static,
Fut: IntoFuture<Item = Resp, Error = Err>, Fut: IntoFuture<Item = Resp, Error = Err>,
Cfg: Clone,
{ {
fn into_new_service(self) -> FnNewService<F, Req, Resp, Err, Fut> { fn into_new_service(self) -> FnNewService<F, Req, Resp, Err, Fut, Cfg> {
FnNewService::new(self) FnNewService::new(self)
} }
} }
impl<F, Req, Resp, Err, Fut> Clone for FnNewService<F, Req, Resp, Err, Fut> impl<F, Req, Resp, Err, Fut, Cfg> Clone for FnNewService<F, Req, Resp, Err, Fut, Cfg>
where where
F: Fn(Req) -> Fut + Clone, F: Fn(Req) -> Fut + Clone,
Fut: IntoFuture<Item = Resp, Error = Err>, Fut: IntoFuture<Item = Resp, Error = Err>,
@ -261,7 +280,7 @@ where
} }
/// `NewService` for state and handler functions /// `NewService` for state and handler functions
pub struct FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> { pub struct FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg> {
f: F1, f: F1,
state: F2, state: F2,
s: marker::PhantomData<S>, s: marker::PhantomData<S>,
@ -271,10 +290,11 @@ pub struct FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> {
err2: marker::PhantomData<Err2>, err2: marker::PhantomData<Err2>,
fut1: marker::PhantomData<Fut1>, fut1: marker::PhantomData<Fut1>,
fut2: marker::PhantomData<Fut2>, fut2: marker::PhantomData<Fut2>,
cfg: marker::PhantomData<Cfg>,
} }
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>
FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>
{ {
fn new(f: F1, state: F2) -> Self { fn new(f: F1, state: F2) -> Self {
FnStateNewService { FnStateNewService {
@ -287,12 +307,13 @@ impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>
err2: marker::PhantomData, err2: marker::PhantomData,
fut1: marker::PhantomData, fut1: marker::PhantomData,
fut2: marker::PhantomData, fut2: marker::PhantomData,
cfg: marker::PhantomData,
} }
} }
} }
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> NewService impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg> NewService
for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>
where where
S: 'static, S: 'static,
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
@ -303,15 +324,17 @@ where
Resp: 'static, Resp: 'static,
Err1: 'static, Err1: 'static,
Err2: 'static, Err2: 'static,
Cfg: Clone,
{ {
type Request = Req; type Request = Req;
type Response = Resp; type Response = Resp;
type Error = Err1; type Error = Err1;
type Service = FnStateService<S, F1, Req, Resp, Err1, Fut1>; type Service = FnStateService<S, F1, Req, Resp, Err1, Fut1>;
type Config = Cfg;
type InitError = Err2; type InitError = Err2;
type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>; type Future = Box<Future<Item = Self::Service, Error = Self::InitError>>;
fn new_service(&self) -> Self::Future { fn new_service(&self, cfg: Cfg) -> Self::Future {
let f = self.f.clone(); let f = self.f.clone();
Box::new( Box::new(
(self.state)() (self.state)()
@ -321,8 +344,9 @@ where
} }
} }
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>
IntoNewService<FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2>> for (F1, F2) IntoNewService<FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>>
for (F1, F2)
where where
S: 'static, S: 'static,
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
@ -333,16 +357,17 @@ where
Resp: 'static, Resp: 'static,
Err1: 'static, Err1: 'static,
Err2: 'static, Err2: 'static,
Cfg: Clone,
{ {
fn into_new_service( fn into_new_service(
self, self,
) -> FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> { ) -> FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg> {
FnStateNewService::new(self.0, self.1) FnStateNewService::new(self.0, self.1)
} }
} }
impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> Clone impl<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg> Clone
for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2> for FnStateNewService<S, F1, F2, Req, Resp, Err1, Err2, Fut1, Fut2, Cfg>
where where
F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static,
F2: Fn() -> Fut2 + Clone, F2: Fn() -> Fut2 + Clone,
@ -467,7 +492,12 @@ where
impl<A, B> NewService for AndThenNewService<A, B> impl<A, B> NewService for AndThenNewService<A, B>
where where
A: NewService<Response = B::Request, Error = B::Error, InitError = B::InitError>, A: NewService<
Response = B::Request,
Error = B::Error,
Config = B::Config,
InitError = B::InitError,
>,
B: NewService, B: NewService,
{ {
type Request = A::Request; type Request = A::Request;
@ -475,11 +505,12 @@ where
type Error = A::Error; type Error = A::Error;
type Service = AndThen<A::Service, B::Service>; type Service = AndThen<A::Service, B::Service>;
type Config = A::Config;
type InitError = A::InitError; type InitError = A::InitError;
type Future = AndThenNewServiceFuture<A, B>; type Future = AndThenNewServiceFuture<A, B>;
fn new_service(&self) -> Self::Future { fn new_service(&self, cfg: A::Config) -> Self::Future {
AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) AndThenNewServiceFuture::new(self.a.new_service(cfg.clone()), self.b.new_service(cfg))
} }
} }
@ -669,11 +700,12 @@ where
type Error = E; type Error = E;
type Service = MapErr<A::Service, F, E>; type Service = MapErr<A::Service, F, E>;
type Config = A::Config;
type InitError = A::InitError; type InitError = A::InitError;
type Future = MapErrNewServiceFuture<A, F, E>; type Future = MapErrNewServiceFuture<A, F, E>;
fn new_service(&self) -> Self::Future { fn new_service(&self, cfg: Self::Config) -> Self::Future {
MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone()) MapErrNewServiceFuture::new(self.a.new_service(cfg), self.f.clone())
} }
} }
@ -759,11 +791,12 @@ where
type Error = A::Error; type Error = A::Error;
type Service = A::Service; type Service = A::Service;
type Config = A::Config;
type InitError = E; type InitError = E;
type Future = MapInitErrFuture<A, F, E>; type Future = MapInitErrFuture<A, F, E>;
fn new_service(&self) -> Self::Future { fn new_service(&self, cfg: Self::Config) -> Self::Future {
MapInitErrFuture::new(self.a.new_service(), self.f.clone()) MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone())
} }
} }

View File

@ -1,29 +1,31 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::Shutdown; // use std::net::Shutdown;
use std::{io, time}; use std::io;
use futures::{future, future::FutureResult, Async, Future, Poll}; use futures::{future, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
use tower_service::{NewService, Service};
use {IntoNewService, IoStream}; use server_config::SslConfig;
use {NewService, Service};
/// Support `SSL` connections via openssl package /// Support `SSL` connections via openssl package
/// ///
/// `alpn` feature enables `OpensslAcceptor` type /// `alpn` feature enables `OpensslAcceptor` type
pub struct OpensslService<T> { pub struct OpensslService<T, Cfg> {
acceptor: SslAcceptor, acceptor: SslAcceptor,
io: PhantomData<T>, io: PhantomData<T>,
cfg: PhantomData<Cfg>,
} }
impl<T> OpensslService<T> { impl<T, Cfg> OpensslService<T, Cfg> {
/// Create default `OpensslService` /// Create default `OpensslService`
pub fn new(builder: SslAcceptorBuilder) -> Self { pub fn new(builder: SslAcceptorBuilder) -> Self {
OpensslService { OpensslService {
acceptor: builder.build(), acceptor: builder.build(),
io: PhantomData, io: PhantomData,
cfg: PhantomData,
} }
} }
@ -44,27 +46,32 @@ impl<T> OpensslService<T> {
Ok(OpensslService { Ok(OpensslService {
acceptor: builder.build(), acceptor: builder.build(),
io: PhantomData, io: PhantomData,
cfg: PhantomData,
}) })
} }
} }
impl<T: AsyncRead + AsyncWrite> Clone for OpensslService<T> { impl<T: AsyncRead + AsyncWrite, Cfg> Clone for OpensslService<T, Cfg> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
io: PhantomData, io: PhantomData,
cfg: PhantomData,
} }
} }
} }
impl<T: AsyncRead + AsyncWrite> NewService for OpensslService<T> { impl<T: AsyncRead + AsyncWrite, Cfg: Clone + AsRef<SslConfig>> NewService
for OpensslService<T, Cfg>
{
type Request = T; type Request = T;
type Response = SslStream<T>; type Response = SslStream<T>;
type Error = io::Error; type Error = io::Error;
type Service = OpensslAcceptor<T>; type Service = OpensslAcceptor<T>;
type Config = Cfg;
type InitError = io::Error; type InitError = io::Error;
type Future = FutureResult<Self::Service, io::Error>; type Future = FutureResult<Self::Service, io::Error>;
fn new_service(&self) -> Self::Future { fn new_service(&self, _: Self::Config) -> Self::Future {
future::ok(OpensslAcceptor { future::ok(OpensslAcceptor {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
io: PhantomData, io: PhantomData,

View File

@ -69,8 +69,8 @@ impl Actor for Worker {
} }
impl Worker { impl Worker {
pub(crate) fn new( pub(crate) fn new<C: 'static>(
ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory + Send>>, ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory<C> + Send>>,
) -> Self { ) -> Self {
let wrk = Worker { let wrk = Worker {
services: Vec::new(), services: Vec::new(),