From 7db29544f9aed669e7615f01aefe04d597c0d382 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 8 Mar 2019 19:43:13 -0800 Subject: [PATCH] add ServerConfig param for server service --- Cargo.toml | 1 + actix-server-config/Cargo.toml | 18 +++++++ actix-server-config/src/lib.rs | 88 +++++++++++++++++++++++++++++++ actix-server/Cargo.toml | 3 +- actix-server/src/builder.rs | 19 +++---- actix-server/src/lib.rs | 2 + actix-server/src/services.rs | 39 +++++++++----- actix-server/src/ssl/nativetls.rs | 9 ++-- actix-server/src/ssl/openssl.rs | 9 ++-- actix-server/src/ssl/rustls.rs | 9 ++-- actix-server/tests/test_server.rs | 55 +++++++++++++++++++ 11 files changed, 221 insertions(+), 31 deletions(-) create mode 100644 actix-server-config/Cargo.toml create mode 100644 actix-server-config/src/lib.rs create mode 100644 actix-server/tests/test_server.rs diff --git a/Cargo.toml b/Cargo.toml index 88a87427..973b2f2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "actix-rt", "actix-service", "actix-server", + "actix-server-config", "actix-test-server", "actix-utils", "router", diff --git a/actix-server-config/Cargo.toml b/actix-server-config/Cargo.toml new file mode 100644 index 00000000..ab2de92b --- /dev/null +++ b/actix-server-config/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "actix-server-config" +version = "0.1.0" +authors = ["Nikolay Kim "] +description = "Actix server config utils" +homepage = "https://actix.rs" +repository = "https://github.com/actix/actix-net.git" +license = "MIT/Apache-2.0" +edition = "2018" +workspace = ".." + +[lib] +name = "actix_server_config" +path = "src/lib.rs" + +[dependencies] +actix-service = { path="../actix-service" } +futures = "0.1.25" diff --git a/actix-server-config/src/lib.rs b/actix-server-config/src/lib.rs new file mode 100644 index 00000000..6a1b3437 --- /dev/null +++ b/actix-server-config/src/lib.rs @@ -0,0 +1,88 @@ +use std::cell::Cell; +use std::marker::PhantomData; +use std::net::SocketAddr; +use std::rc::Rc; + +use actix_service::{FnService, IntoService, NewService}; +use futures::future::{ok, FutureResult, IntoFuture}; + +#[derive(Debug, Clone)] +pub struct ServerConfig { + addr: SocketAddr, + secure: Rc>, +} + +impl ServerConfig { + pub fn new(addr: SocketAddr) -> Self { + ServerConfig { + addr, + secure: Rc::new(Cell::new(false)), + } + } + + /// Returns the address of the local half of this TCP server socket + pub fn local_addr(&self) -> SocketAddr { + self.addr + } + + /// Returns true if connection is secure (tls enabled) + pub fn secure(&self) -> bool { + self.secure.as_ref().get() + } + + /// Set secure flag + pub fn set_secure(&self) { + self.secure.as_ref().set(true) + } +} + +pub fn server_fn(f: F) -> impl NewService +where + F: Fn(&ServerConfig) -> U + Clone + 'static, + U: FnMut(Req) -> Out + Clone + 'static, + Out: IntoFuture, +{ + ServerFnNewService { f, _t: PhantomData } +} + +struct ServerFnNewService +where + F: Fn(&ServerConfig) -> U + Clone + 'static, + U: FnMut(Req) -> Out + Clone + 'static, + Out: IntoFuture, +{ + f: F, + _t: PhantomData<(U, Req, Out)>, +} + +impl NewService for ServerFnNewService +where + F: Fn(&ServerConfig) -> U + Clone + 'static, + U: FnMut(Req) -> Out + Clone + 'static, + Out: IntoFuture, +{ + type Response = Out::Item; + type Error = Out::Error; + type Service = FnService; + + type InitError = (); + type Future = FutureResult; + + fn new_service(&self, cfg: &ServerConfig) -> Self::Future { + ok((self.f)(cfg).into_service()) + } +} + +impl Clone for ServerFnNewService +where + F: Fn(&ServerConfig) -> U + Clone + 'static, + U: FnMut(Req) -> Out + Clone, + Out: IntoFuture, +{ + fn clone(&self) -> Self { + Self { + f: self.f.clone(), + _t: PhantomData, + } + } +} diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 2cc21f9a..3da83ba6 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -33,9 +33,10 @@ ssl = ["openssl", "tokio-openssl"] rust-tls = ["rustls", "tokio-rustls", "webpki", "webpki-roots"] [dependencies] +actix-rt = "0.2.0" #actix-service = "0.3.2" actix-service = { path="../actix-service" } -actix-rt = "0.2.0" +actix-server-config = { path="../actix-server-config" } log = "0.4" num_cpus = "1.0" diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index df4ccf8d..86433c00 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -151,14 +151,14 @@ impl ServerBuilder { { let sockets = bind_addr(addr)?; - let token = self.token.next(); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - )); - for lst in sockets { + let token = self.token.next(); + self.services.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory.clone(), + lst.local_addr()?, + )); self.sockets.push((token, lst)); } Ok(self) @@ -170,7 +170,7 @@ impl ServerBuilder { name: N, lst: net::TcpListener, factory: F, - ) -> Self + ) -> io::Result where F: ServiceFactory, { @@ -179,9 +179,10 @@ impl ServerBuilder { name.as_ref().to_string(), token, factory, + lst.local_addr()?, )); self.sockets.push((token, lst)); - self + Ok(self) } /// Spawn new thread and start listening for incoming connections. diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index eecc46aa..a5ec2a72 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -10,6 +10,8 @@ mod signals; pub mod ssl; mod worker; +pub use actix_server_config::ServerConfig; + pub use self::builder::ServerBuilder; pub use self::server::Server; pub use self::service_config::{ServiceConfig, ServiceRuntime}; diff --git a/actix-server/src/services.rs b/actix-server/src/services.rs index a78bbb48..b69693a6 100644 --- a/actix-server/src/services.rs +++ b/actix-server/src/services.rs @@ -1,13 +1,14 @@ -use std::net; +use std::net::{SocketAddr, TcpStream}; use std::time::Duration; use actix_rt::spawn; +use actix_server_config::ServerConfig; use actix_service::{NewService, Service}; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; use log::error; use tokio_reactor::Handle; -use tokio_tcp::TcpStream; +use tokio_tcp::TcpStream as TokioTcpStream; use super::Token; use crate::counter::CounterGuard; @@ -15,7 +16,7 @@ use crate::counter::CounterGuard; /// Server message pub(crate) enum ServerMessage { /// New stream - Connect(net::TcpStream), + Connect(TcpStream), /// Gracefull shutdown Shutdown(Duration), /// Force shutdown @@ -23,7 +24,7 @@ pub(crate) enum ServerMessage { } pub trait ServiceFactory: Send + Clone + 'static { - type NewService: NewService; + type NewService: NewService; fn create(&self) -> Self::NewService; } @@ -57,7 +58,7 @@ impl StreamService { impl Service<(Option, ServerMessage)> for StreamService where - T: Service, + T: Service, T::Future: 'static, T::Error: 'static, { @@ -72,9 +73,10 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { - let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); + let stream = + TokioTcpStream::from_std(stream, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + }); if let Ok(stream) = stream { spawn(self.service.call(stream).then(move |res| { @@ -95,14 +97,25 @@ pub(crate) struct StreamNewService { name: String, inner: F, token: Token, + addr: SocketAddr, } impl StreamNewService where F: ServiceFactory, { - pub(crate) fn create(name: String, token: Token, inner: F) -> Box { - Box::new(Self { name, token, inner }) + pub(crate) fn create( + name: String, + token: Token, + inner: F, + addr: SocketAddr, + ) -> Box { + Box::new(Self { + name, + token, + inner, + addr, + }) } } @@ -119,15 +132,17 @@ where name: self.name.clone(), inner: self.inner.clone(), token: self.token, + addr: self.addr, }) } fn create(&self) -> Box, Error = ()>> { let token = self.token; + let config = ServerConfig::new(self.addr); Box::new( self.inner .create() - .new_service(&()) + .new_service(&config) .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = Box::new(StreamService::new(inner)); @@ -154,7 +169,7 @@ impl InternalServiceFactory for Box { impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, - T: NewService, + T: NewService, { type NewService = T; diff --git a/actix-server/src/ssl/nativetls.rs b/actix-server/src/ssl/nativetls.rs index c9d02f5a..4b86a9b8 100644 --- a/actix-server/src/ssl/nativetls.rs +++ b/actix-server/src/ssl/nativetls.rs @@ -6,8 +6,9 @@ use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use native_tls::{self, Error, HandshakeError, TlsAcceptor}; use tokio_io::{AsyncRead, AsyncWrite}; -use super::MAX_CONN_COUNTER; use crate::counter::{Counter, CounterGuard}; +use crate::ssl::MAX_CONN_COUNTER; +use crate::ServerConfig; /// Support `SSL` connections via native-tls package /// @@ -36,14 +37,16 @@ impl Clone for NativeTlsAcceptor { } } -impl NewService for NativeTlsAcceptor { +impl NewService for NativeTlsAcceptor { type Response = TlsStream; type Error = Error; type Service = NativeTlsAcceptorService; type InitError = (); type Future = FutureResult; - fn new_service(&self, _: &()) -> Self::Future { + fn new_service(&self, cfg: &ServerConfig) -> Self::Future { + cfg.set_secure(); + MAX_CONN_COUNTER.with(|conns| { ok(NativeTlsAcceptorService { acceptor: self.acceptor.clone(), diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index 3153841b..756e9eeb 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -6,8 +6,9 @@ use openssl::ssl::{HandshakeError, SslAcceptor}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; -use super::MAX_CONN_COUNTER; use crate::counter::{Counter, CounterGuard}; +use crate::ssl::MAX_CONN_COUNTER; +use crate::ServerConfig; /// Support `SSL` connections via openssl package /// @@ -36,14 +37,16 @@ impl Clone for OpensslAcceptor { } } -impl NewService for OpensslAcceptor { +impl NewService for OpensslAcceptor { type Response = SslStream; type Error = HandshakeError; type Service = OpensslAcceptorService; type InitError = (); type Future = FutureResult; - fn new_service(&self, _: &()) -> Self::Future { + fn new_service(&self, cfg: &ServerConfig) -> Self::Future { + cfg.set_secure(); + MAX_CONN_COUNTER.with(|conns| { ok(OpensslAcceptorService { acceptor: self.acceptor.clone(), diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index 5f233269..f1ec30a2 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -8,8 +8,9 @@ use rustls::{ServerConfig, ServerSession}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_rustls::{Accept, TlsAcceptor, TlsStream}; -use super::MAX_CONN_COUNTER; use crate::counter::{Counter, CounterGuard}; +use crate::ssl::MAX_CONN_COUNTER; +use crate::ServerConfig as SrvConfig; /// Support `SSL` connections via rustls package /// @@ -38,14 +39,16 @@ impl Clone for RustlsAcceptor { } } -impl NewService for RustlsAcceptor { +impl NewService for RustlsAcceptor { type Response = TlsStream; type Error = io::Error; type Service = RustlsAcceptorService; type InitError = (); type Future = FutureResult; - fn new_service(&self, _: &()) -> Self::Future { + fn new_service(&self, cfg: &SrvConfig) -> Self::Future { + cfg.set_secure(); + MAX_CONN_COUNTER.with(|conns| { ok(RustlsAcceptorService { acceptor: self.config.clone().into(), diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs new file mode 100644 index 00000000..7e6be252 --- /dev/null +++ b/actix-server/tests/test_server.rs @@ -0,0 +1,55 @@ +use std::{net, thread, time}; + +use actix_server::{Server, ServerConfig}; +use actix_service::{fn_cfg_factory, IntoService}; +use net2::TcpBuilder; + +fn unused_addr() -> net::SocketAddr { + let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); + let socket = TcpBuilder::new_v4().unwrap(); + socket.bind(&addr).unwrap(); + socket.reuse_address(true).unwrap(); + let tcp = socket.to_tcp_listener().unwrap(); + tcp.local_addr().unwrap() +} + +#[test] +fn test_bind() { + let addr = unused_addr(); + + thread::spawn(move || { + Server::build() + .bind("test", addr, move || { + fn_cfg_factory(move |cfg: &ServerConfig| { + assert_eq!(cfg.local_addr(), addr); + Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) + }) + }) + .unwrap() + .run() + }); + + thread::sleep(time::Duration::from_millis(500)); + assert!(net::TcpStream::connect(addr).is_ok()); +} + +#[test] +fn test_listen() { + let addr = unused_addr(); + + thread::spawn(move || { + let lst = net::TcpListener::bind(addr).unwrap(); + Server::build() + .listen("test", lst, move || { + fn_cfg_factory(move |cfg: &ServerConfig| { + assert_eq!(cfg.local_addr(), addr); + Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) + }) + }) + .unwrap() + .run() + }); + + thread::sleep(time::Duration::from_millis(500)); + assert!(net::TcpStream::connect(addr).is_ok()); +}