1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-06-27 01:37:42 +02:00

add ServerConfig to server services

This commit is contained in:
Nikolay Kim
2019-03-09 07:27:56 -08:00
parent ac62e2dbf9
commit d2b96ff877
15 changed files with 163 additions and 45 deletions

View File

@ -36,6 +36,7 @@ rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
actix-rt = "0.2.0"
#actix-service = "0.3.2"
actix-service = { path="../actix-service" }
actix-server-config = { path="../actix-server-config" }
log = "0.4"
num_cpus = "1.0"

View File

@ -150,14 +150,15 @@ impl ServerBuilder {
U: net::ToSocketAddrs,
{
let sockets = bind_addr(addr)?;
let token = self.token.next();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
));
for lst in sockets {
let token = self.token.next();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));
self.sockets.push((token, lst));
}
Ok(self)
@ -178,6 +179,7 @@ impl ServerBuilder {
name.as_ref().to_string(),
token,
factory,
lst.local_addr()?,
));
self.sockets.push((token, lst));
Ok(self)

View File

@ -10,6 +10,8 @@ mod signals;
pub mod ssl;
mod worker;
pub use actix_server_config::ServerConfig;
pub use self::builder::ServerBuilder;
pub use self::server::Server;
pub use self::service_config::{ServiceConfig, ServiceRuntime};

View File

@ -169,8 +169,8 @@ impl ServiceRuntime {
pub fn service<T, F>(&mut self, name: &str, service: F)
where
F: IntoNewService<T, TcpStream>,
T: NewService<TcpStream, Response = ()> + 'static,
F: IntoNewService<T>,
T: NewService<Request = TcpStream, Response = ()> + 'static,
T::Future: 'static,
T::Service: 'static,
T::InitError: fmt::Debug,
@ -191,7 +191,7 @@ impl ServiceRuntime {
type BoxedNewService = Box<
NewService<
(Option<CounterGuard>, ServerMessage),
Request = (Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
InitError = (),
@ -204,14 +204,15 @@ struct ServiceFactory<T> {
inner: T,
}
impl<T> NewService<(Option<CounterGuard>, ServerMessage)> for ServiceFactory<T>
impl<T> NewService for ServiceFactory<T>
where
T: NewService<TcpStream, Response = ()>,
T: NewService<Request = TcpStream, Response = ()>,
T::Future: 'static,
T::Service: 'static,
T::Error: 'static,
T::InitError: fmt::Debug + 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type InitError = ();

View File

@ -1,7 +1,8 @@
use std::net::TcpStream;
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
use actix_rt::spawn;
use actix_server_config::ServerConfig;
use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll};
@ -23,7 +24,7 @@ pub(crate) enum ServerMessage {
}
pub trait ServiceFactory: Send + Clone + 'static {
type NewService: NewService<TokioTcpStream>;
type NewService: NewService<ServerConfig, Request = TokioTcpStream>;
fn create(&self) -> Self::NewService;
}
@ -38,7 +39,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box<
Service<
(Option<CounterGuard>, ServerMessage),
Request = (Option<CounterGuard>, ServerMessage),
Response = (),
Error = (),
Future = FutureResult<(), ()>,
@ -55,12 +56,13 @@ impl<T> StreamService<T> {
}
}
impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T>
impl<T> Service for StreamService<T>
where
T: Service<TokioTcpStream>,
T: Service<Request = TokioTcpStream>,
T::Future: 'static,
T::Error: 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
@ -96,14 +98,25 @@ pub(crate) struct StreamNewService<F: ServiceFactory> {
name: String,
inner: F,
token: Token,
addr: SocketAddr,
}
impl<F> StreamNewService<F>
where
F: ServiceFactory,
{
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, token, inner })
pub(crate) fn create(
name: String,
token: Token,
inner: F,
addr: SocketAddr,
) -> Box<InternalServiceFactory> {
Box::new(Self {
name,
token,
inner,
addr,
})
}
}
@ -120,15 +133,17 @@ where
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
addr: self.addr,
})
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
let config = ServerConfig::new(self.addr);
Box::new(
self.inner
.create()
.new_service(&())
.new_service(&config)
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner));
@ -155,7 +170,7 @@ impl InternalServiceFactory for Box<InternalServiceFactory> {
impl<F, T> ServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<TokioTcpStream>,
T: NewService<ServerConfig, Request = TokioTcpStream>,
{
type NewService = T;

View File

@ -1,6 +1,7 @@
use std::io;
use std::marker::PhantomData;
use actix_server_config::ServerConfig;
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use native_tls::{self, Error, HandshakeError, TlsAcceptor};
@ -36,14 +37,17 @@ impl<T: AsyncRead + AsyncWrite> Clone for NativeTlsAcceptor<T> {
}
}
impl<T: AsyncRead + AsyncWrite> NewService<T> for NativeTlsAcceptor<T> {
impl<T: AsyncRead + AsyncWrite> NewService<ServerConfig> for NativeTlsAcceptor<T> {
type Request = T;
type Response = TlsStream<T>;
type Error = Error;
type Service = NativeTlsAcceptorService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| {
ok(NativeTlsAcceptorService {
acceptor: self.acceptor.clone(),
@ -60,7 +64,8 @@ pub struct NativeTlsAcceptorService<T> {
conns: Counter,
}
impl<T: AsyncRead + AsyncWrite> Service<T> for NativeTlsAcceptorService<T> {
impl<T: AsyncRead + AsyncWrite> Service for NativeTlsAcceptorService<T> {
type Request = T;
type Response = TlsStream<T>;
type Error = Error;
type Future = Accept<T>;

View File

@ -8,6 +8,7 @@ use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::ServerConfig;
/// Support `SSL` connections via openssl package
///
@ -36,14 +37,17 @@ impl<T: AsyncRead + AsyncWrite> Clone for OpensslAcceptor<T> {
}
}
impl<T: AsyncRead + AsyncWrite> NewService<T> for OpensslAcceptor<T> {
impl<T: AsyncRead + AsyncWrite> NewService<ServerConfig> for OpensslAcceptor<T> {
type Request = T;
type Response = SslStream<T>;
type Error = HandshakeError<T>;
type Service = OpensslAcceptorService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| {
ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(),
@ -60,7 +64,8 @@ pub struct OpensslAcceptorService<T> {
conns: Counter,
}
impl<T: AsyncRead + AsyncWrite> Service<T> for OpensslAcceptorService<T> {
impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptorService<T> {
type Request = T;
type Response = SslStream<T>;
type Error = HandshakeError<T>;
type Future = OpensslAcceptorServiceFut<T>;

View File

@ -10,6 +10,7 @@ use tokio_rustls::{Accept, TlsAcceptor, TlsStream};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::ServerConfig as SrvConfig;
/// Support `SSL` connections via rustls package
///
@ -38,14 +39,17 @@ impl<T> Clone for RustlsAcceptor<T> {
}
}
impl<T: AsyncRead + AsyncWrite> NewService<T> for RustlsAcceptor<T> {
impl<T: AsyncRead + AsyncWrite> NewService<SrvConfig> for RustlsAcceptor<T> {
type Request = T;
type Response = TlsStream<T, ServerSession>;
type Error = io::Error;
type Service = RustlsAcceptorService<T>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future {
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| {
ok(RustlsAcceptorService {
acceptor: self.config.clone().into(),
@ -62,7 +66,8 @@ pub struct RustlsAcceptorService<T> {
conns: Counter,
}
impl<T: AsyncRead + AsyncWrite> Service<T> for RustlsAcceptorService<T> {
impl<T: AsyncRead + AsyncWrite> Service for RustlsAcceptorService<T> {
type Request = T;
type Response = TlsStream<T, ServerSession>;
type Error = io::Error;
type Future = RustlsAcceptorServiceFut<T>;

View File

@ -1,7 +1,7 @@
use std::{net, thread, time};
use actix_server::Server;
use actix_service::fn_service;
use actix_server::{Server, ServerConfig};
use actix_service::{fn_cfg_factory, fn_service, IntoService};
use net2::TcpBuilder;
fn unused_addr() -> net::SocketAddr {
@ -19,7 +19,27 @@ fn test_bind() {
thread::spawn(move || {
Server::build()
.bind("test", addr, || fn_service(|_| Ok::<_, ()>(())))
.bind("test", addr, move || {
fn_cfg_factory(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
})
})
.unwrap()
.run()
});
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
}
#[test]
fn test_bind_no_config() {
let addr = unused_addr();
thread::spawn(move || {
Server::build()
.bind("test", addr, move || fn_service(|_| Ok::<_, ()>(())))
.unwrap()
.run()
});
@ -35,7 +55,12 @@ fn test_listen() {
thread::spawn(move || {
let lst = net::TcpListener::bind(addr).unwrap();
Server::build()
.listen("test", lst, move || fn_service(|_| Ok::<_, ()>(())))
.listen("test", lst, move || {
fn_cfg_factory(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
})
})
.unwrap()
.run()
});