1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

add ServerConfig param for server service

This commit is contained in:
Nikolay Kim 2019-03-08 19:43:13 -08:00
parent 4850cf41ff
commit 7db29544f9
11 changed files with 221 additions and 31 deletions

View File

@ -20,6 +20,7 @@ members = [
"actix-rt", "actix-rt",
"actix-service", "actix-service",
"actix-server", "actix-server",
"actix-server-config",
"actix-test-server", "actix-test-server",
"actix-utils", "actix-utils",
"router", "router",

View File

@ -0,0 +1,18 @@
[package]
name = "actix-server-config"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix server config utils"
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
license = "MIT/Apache-2.0"
edition = "2018"
workspace = ".."
[lib]
name = "actix_server_config"
path = "src/lib.rs"
[dependencies]
actix-service = { path="../actix-service" }
futures = "0.1.25"

View File

@ -0,0 +1,88 @@
use std::cell::Cell;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::Rc;
use actix_service::{FnService, IntoService, NewService};
use futures::future::{ok, FutureResult, IntoFuture};
#[derive(Debug, Clone)]
pub struct ServerConfig {
addr: SocketAddr,
secure: Rc<Cell<bool>>,
}
impl ServerConfig {
pub fn new(addr: SocketAddr) -> Self {
ServerConfig {
addr,
secure: Rc::new(Cell::new(false)),
}
}
/// Returns the address of the local half of this TCP server socket
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
/// Returns true if connection is secure (tls enabled)
pub fn secure(&self) -> bool {
self.secure.as_ref().get()
}
/// Set secure flag
pub fn set_secure(&self) {
self.secure.as_ref().set(true)
}
}
pub fn server_fn<F, U, Req, Out>(f: F) -> impl NewService<Req, ServerConfig>
where
F: Fn(&ServerConfig) -> U + Clone + 'static,
U: FnMut(Req) -> Out + Clone + 'static,
Out: IntoFuture,
{
ServerFnNewService { f, _t: PhantomData }
}
struct ServerFnNewService<F, U, Req, Out>
where
F: Fn(&ServerConfig) -> U + Clone + 'static,
U: FnMut(Req) -> Out + Clone + 'static,
Out: IntoFuture,
{
f: F,
_t: PhantomData<(U, Req, Out)>,
}
impl<F, U, Req, Out> NewService<Req, ServerConfig> for ServerFnNewService<F, U, Req, Out>
where
F: Fn(&ServerConfig) -> U + Clone + 'static,
U: FnMut(Req) -> Out + Clone + 'static,
Out: IntoFuture,
{
type Response = Out::Item;
type Error = Out::Error;
type Service = FnService<U, Req, Out>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
ok((self.f)(cfg).into_service())
}
}
impl<F, U, Req, Out> Clone for ServerFnNewService<F, U, Req, Out>
where
F: Fn(&ServerConfig) -> U + Clone + 'static,
U: FnMut(Req) -> Out + Clone,
Out: IntoFuture,
{
fn clone(&self) -> Self {
Self {
f: self.f.clone(),
_t: PhantomData,
}
}
}

View File

@ -33,9 +33,10 @@ ssl = ["openssl", "tokio-openssl"]
rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"]
[dependencies] [dependencies]
actix-rt = "0.2.0"
#actix-service = "0.3.2" #actix-service = "0.3.2"
actix-service = { path="../actix-service" } actix-service = { path="../actix-service" }
actix-rt = "0.2.0" actix-server-config = { path="../actix-server-config" }
log = "0.4" log = "0.4"
num_cpus = "1.0" num_cpus = "1.0"

View File

@ -151,14 +151,14 @@ impl ServerBuilder {
{ {
let sockets = bind_addr(addr)?; let sockets = bind_addr(addr)?;
let token = self.token.next();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
));
for lst in sockets { for lst in sockets {
let token = self.token.next();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));
self.sockets.push((token, lst)); self.sockets.push((token, lst));
} }
Ok(self) Ok(self)
@ -170,7 +170,7 @@ impl ServerBuilder {
name: N, name: N,
lst: net::TcpListener, lst: net::TcpListener,
factory: F, factory: F,
) -> Self ) -> io::Result<Self>
where where
F: ServiceFactory, F: ServiceFactory,
{ {
@ -179,9 +179,10 @@ impl ServerBuilder {
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
factory, factory,
lst.local_addr()?,
)); ));
self.sockets.push((token, lst)); self.sockets.push((token, lst));
self Ok(self)
} }
/// Spawn new thread and start listening for incoming connections. /// Spawn new thread and start listening for incoming connections.

View File

@ -10,6 +10,8 @@ mod signals;
pub mod ssl; pub mod ssl;
mod worker; mod worker;
pub use actix_server_config::ServerConfig;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;
pub use self::server::Server; pub use self::server::Server;
pub use self::service_config::{ServiceConfig, ServiceRuntime}; pub use self::service_config::{ServiceConfig, ServiceRuntime};

View File

@ -1,13 +1,14 @@
use std::net; use std::net::{SocketAddr, TcpStream};
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn; use actix_rt::spawn;
use actix_server_config::ServerConfig;
use actix_service::{NewService, Service}; use actix_service::{NewService, Service};
use futures::future::{err, ok, FutureResult}; use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll}; use futures::{Future, Poll};
use log::error; use log::error;
use tokio_reactor::Handle; use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream as TokioTcpStream;
use super::Token; use super::Token;
use crate::counter::CounterGuard; use crate::counter::CounterGuard;
@ -15,7 +16,7 @@ use crate::counter::CounterGuard;
/// Server message /// Server message
pub(crate) enum ServerMessage { pub(crate) enum ServerMessage {
/// New stream /// New stream
Connect(net::TcpStream), Connect(TcpStream),
/// Gracefull shutdown /// Gracefull shutdown
Shutdown(Duration), Shutdown(Duration),
/// Force shutdown /// Force shutdown
@ -23,7 +24,7 @@ pub(crate) enum ServerMessage {
} }
pub trait ServiceFactory: Send + Clone + 'static { pub trait ServiceFactory: Send + Clone + 'static {
type NewService: NewService<TcpStream>; type NewService: NewService<TokioTcpStream, ServerConfig>;
fn create(&self) -> Self::NewService; fn create(&self) -> Self::NewService;
} }
@ -57,7 +58,7 @@ impl<T> StreamService<T> {
impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T> impl<T> Service<(Option<CounterGuard>, ServerMessage)> for StreamService<T>
where where
T: Service<TcpStream>, T: Service<TokioTcpStream>,
T::Future: 'static, T::Future: 'static,
T::Error: 'static, T::Error: 'static,
{ {
@ -72,9 +73,10 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { let stream =
error!("Can not convert to an async tcp stream: {}", e); TokioTcpStream::from_std(stream, &Handle::default()).map_err(|e| {
}); error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream { if let Ok(stream) = stream {
spawn(self.service.call(stream).then(move |res| { spawn(self.service.call(stream).then(move |res| {
@ -95,14 +97,25 @@ pub(crate) struct StreamNewService<F: ServiceFactory> {
name: String, name: String,
inner: F, inner: F,
token: Token, token: Token,
addr: SocketAddr,
} }
impl<F> StreamNewService<F> impl<F> StreamNewService<F>
where where
F: ServiceFactory, F: ServiceFactory,
{ {
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> { pub(crate) fn create(
Box::new(Self { name, token, inner }) name: String,
token: Token,
inner: F,
addr: SocketAddr,
) -> Box<InternalServiceFactory> {
Box::new(Self {
name,
token,
inner,
addr,
})
} }
} }
@ -119,15 +132,17 @@ where
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
token: self.token, token: self.token,
addr: self.addr,
}) })
} }
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> { fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token; let token = self.token;
let config = ServerConfig::new(self.addr);
Box::new( Box::new(
self.inner self.inner
.create() .create()
.new_service(&()) .new_service(&config)
.map_err(|_| ()) .map_err(|_| ())
.map(move |inner| { .map(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner)); let service: BoxedServerService = Box::new(StreamService::new(inner));
@ -154,7 +169,7 @@ impl InternalServiceFactory for Box<InternalServiceFactory> {
impl<F, T> ServiceFactory for F impl<F, T> ServiceFactory for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: NewService<TcpStream>, T: NewService<TokioTcpStream, ServerConfig>,
{ {
type NewService = T; type NewService = T;

View File

@ -6,8 +6,9 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use native_tls::{self, Error, HandshakeError, TlsAcceptor}; use native_tls::{self, Error, HandshakeError, TlsAcceptor};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use super::MAX_CONN_COUNTER;
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::ServerConfig;
/// Support `SSL` connections via native-tls package /// Support `SSL` connections via native-tls package
/// ///
@ -36,14 +37,16 @@ impl<T: AsyncRead + AsyncWrite> Clone for NativeTlsAcceptor<T> {
} }
} }
impl<T: AsyncRead + AsyncWrite> NewService<T> for NativeTlsAcceptor<T> { impl<T: AsyncRead + AsyncWrite> NewService<T, ServerConfig> for NativeTlsAcceptor<T> {
type Response = TlsStream<T>; type Response = TlsStream<T>;
type Error = Error; type Error = Error;
type Service = NativeTlsAcceptorService<T>; type Service = NativeTlsAcceptorService<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| { MAX_CONN_COUNTER.with(|conns| {
ok(NativeTlsAcceptorService { ok(NativeTlsAcceptorService {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),

View File

@ -6,8 +6,9 @@ use openssl::ssl::{HandshakeError, SslAcceptor};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
use super::MAX_CONN_COUNTER;
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::ServerConfig;
/// Support `SSL` connections via openssl package /// Support `SSL` connections via openssl package
/// ///
@ -36,14 +37,16 @@ impl<T: AsyncRead + AsyncWrite> Clone for OpensslAcceptor<T> {
} }
} }
impl<T: AsyncRead + AsyncWrite> NewService<T> for OpensslAcceptor<T> { impl<T: AsyncRead + AsyncWrite> NewService<T, ServerConfig> for OpensslAcceptor<T> {
type Response = SslStream<T>; type Response = SslStream<T>;
type Error = HandshakeError<T>; type Error = HandshakeError<T>;
type Service = OpensslAcceptorService<T>; type Service = OpensslAcceptorService<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, cfg: &ServerConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| { MAX_CONN_COUNTER.with(|conns| {
ok(OpensslAcceptorService { ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),

View File

@ -8,8 +8,9 @@ use rustls::{ServerConfig, ServerSession};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_rustls::{Accept, TlsAcceptor, TlsStream}; use tokio_rustls::{Accept, TlsAcceptor, TlsStream};
use super::MAX_CONN_COUNTER;
use crate::counter::{Counter, CounterGuard}; use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::ServerConfig as SrvConfig;
/// Support `SSL` connections via rustls package /// Support `SSL` connections via rustls package
/// ///
@ -38,14 +39,16 @@ impl<T> Clone for RustlsAcceptor<T> {
} }
} }
impl<T: AsyncRead + AsyncWrite> NewService<T> for RustlsAcceptor<T> { impl<T: AsyncRead + AsyncWrite> NewService<T, SrvConfig> for RustlsAcceptor<T> {
type Response = TlsStream<T, ServerSession>; type Response = TlsStream<T, ServerSession>;
type Error = io::Error; type Error = io::Error;
type Service = RustlsAcceptorService<T>; type Service = RustlsAcceptorService<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = FutureResult<Self::Service, Self::InitError>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
cfg.set_secure();
MAX_CONN_COUNTER.with(|conns| { MAX_CONN_COUNTER.with(|conns| {
ok(RustlsAcceptorService { ok(RustlsAcceptorService {
acceptor: self.config.clone().into(), acceptor: self.config.clone().into(),

View File

@ -0,0 +1,55 @@
use std::{net, thread, time};
use actix_server::{Server, ServerConfig};
use actix_service::{fn_cfg_factory, IntoService};
use net2::TcpBuilder;
fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = TcpBuilder::new_v4().unwrap();
socket.bind(&addr).unwrap();
socket.reuse_address(true).unwrap();
let tcp = socket.to_tcp_listener().unwrap();
tcp.local_addr().unwrap()
}
#[test]
fn test_bind() {
let addr = unused_addr();
thread::spawn(move || {
Server::build()
.bind("test", addr, move || {
fn_cfg_factory(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
})
})
.unwrap()
.run()
});
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
}
#[test]
fn test_listen() {
let addr = unused_addr();
thread::spawn(move || {
let lst = net::TcpListener::bind(addr).unwrap();
Server::build()
.listen("test", lst, move || {
fn_cfg_factory(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service())
})
})
.unwrap()
.run()
});
thread::sleep(time::Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
}