From 787255d030cae76ae6c6ca7b89e054dcfed46a10 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 11 Mar 2019 12:01:55 -0700 Subject: [PATCH] add io parameters --- actix-server-config/src/lib.rs | 64 ++++++++++++++++++++++++++++ actix-server/src/builder.rs | 32 +++++++++++--- actix-server/src/lib.rs | 2 +- actix-server/src/service_config.rs | 11 +++-- actix-server/src/services.rs | 23 +++++----- actix-server/src/ssl/nativetls.rs | 55 ++++++++++++++---------- actix-server/src/ssl/openssl.rs | 67 ++++++++++++++++++++---------- actix-server/src/ssl/rustls.rs | 50 ++++++++++++---------- actix-server/tests/test_server.rs | 50 ++++++++++++++++++++++ 9 files changed, 267 insertions(+), 87 deletions(-) diff --git a/actix-server-config/src/lib.rs b/actix-server-config/src/lib.rs index 51accad1..b3218825 100644 --- a/actix-server-config/src/lib.rs +++ b/actix-server-config/src/lib.rs @@ -31,3 +31,67 @@ impl ServerConfig { self.secure.as_ref().set(true) } } + +#[derive(Copy, Clone, Debug)] +pub enum Protocol { + Unknown, + Http10, + Http11, + Http2, + Proto1, + Proto2, + Proto3, + Proto4, + Proto5, + Proto6, +} + +pub struct Io { + io: T, + proto: Protocol, + params: P, +} + +impl Io { + pub fn new(io: T) -> Self { + Self { + io, + proto: Protocol::Unknown, + params: (), + } + } +} + +impl Io { + pub fn from_parts(io: T, params: P, proto: Protocol) -> Self { + Self { io, params, proto } + } + + pub fn into_parts(self) -> (T, P, Protocol) { + (self.io, self.params, self.proto) + } + + pub fn io(&self) -> &T { + &self.io + } + + pub fn io_mut(&mut self) -> &mut T { + &mut self.io + } + + pub fn protocol(&self) -> Protocol { + self.proto + } + + /// Maps an Io<_, P> to Io<_, U> by applying a function to a contained value. + pub fn map(self, op: F) -> Io + where + F: FnOnce(P) -> U, + { + Io { + io: self.io, + proto: self.proto, + params: op(self.params), + } + } +} diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 86433c00..0172013f 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -23,6 +23,7 @@ use crate::{ssl, Token}; pub struct ServerBuilder { threads: usize, token: Token, + backlog: i32, workers: Vec<(usize, WorkerClient)>, services: Vec>, sockets: Vec<(Token, net::TcpListener)>, @@ -53,6 +54,7 @@ impl ServerBuilder { services: Vec::new(), sockets: Vec::new(), accept: AcceptLoop::new(server.clone()), + backlog: 2048, exit: false, shutdown_timeout: Duration::from_secs(30), no_signals: false, @@ -70,6 +72,21 @@ impl ServerBuilder { self } + /// Set the maximum number of pending connections. + /// + /// This refers to the number of clients that can be waiting to be served. + /// Exceeding this number results in the client getting an error when + /// attempting to connect. It should only affect servers under significant + /// load. + /// + /// Generally set in the 64-2048 range. Default value is 2048. + /// + /// This method should be called before `bind()` method call. + pub fn backlog(mut self, num: i32) -> Self { + self.backlog = num; + self + } + /// Sets the maximum per-worker number of concurrent connections. /// /// All socket listeners will stop accepting connections when this limit is @@ -125,7 +142,7 @@ impl ServerBuilder { where F: Fn(&mut ServiceConfig) -> io::Result<()>, { - let mut cfg = ServiceConfig::new(self.threads); + let mut cfg = ServiceConfig::new(self.threads, self.backlog); f(&mut cfg)?; @@ -149,7 +166,7 @@ impl ServerBuilder { F: ServiceFactory, U: net::ToSocketAddrs, { - let sockets = bind_addr(addr)?; + let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { let token = self.token.next(); @@ -393,12 +410,15 @@ impl Future for ServerBuilder { } } -pub(super) fn bind_addr(addr: S) -> io::Result> { +pub(super) fn bind_addr( + addr: S, + backlog: i32, +) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { - match create_tcp_listener(addr) { + match create_tcp_listener(addr, backlog) { Ok(lst) => { succ = true; sockets.push(lst); @@ -421,12 +441,12 @@ pub(super) fn bind_addr(addr: S) -> io::Result io::Result { +fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result { let builder = match addr { net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, }; builder.reuse_address(true)?; builder.bind(addr)?; - Ok(builder.listen(1024)?) + Ok(builder.listen(backlog)?) } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index a5ec2a72..2b0beb49 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -10,7 +10,7 @@ mod signals; pub mod ssl; mod worker; -pub use actix_server_config::ServerConfig; +pub use actix_server_config::{Io, Protocol, ServerConfig}; pub use self::builder::ServerBuilder; pub use self::server::Server; diff --git a/actix-server/src/service_config.rs b/actix-server/src/service_config.rs index 885f4254..faa50de3 100644 --- a/actix-server/src/service_config.rs +++ b/actix-server/src/service_config.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::{fmt, io, net}; +use actix_server_config::Io; use actix_service::{IntoNewService, NewService}; use futures::future::{join_all, Future}; use log::error; @@ -18,12 +19,14 @@ pub struct ServiceConfig { pub(crate) services: Vec<(String, net::TcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, + pub(crate) backlog: i32, } impl ServiceConfig { - pub(super) fn new(threads: usize) -> ServiceConfig { + pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig { ServiceConfig { threads, + backlog, services: Vec::new(), apply: None, } @@ -42,7 +45,7 @@ impl ServiceConfig { where U: net::ToSocketAddrs, { - let sockets = bind_addr(addr)?; + let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { self.listen(name.as_ref(), lst); @@ -170,7 +173,7 @@ impl ServiceRuntime { pub fn service(&mut self, name: &str, service: F) where F: IntoNewService, - T: NewService + 'static, + T: NewService, Response = ()> + 'static, T::Future: 'static, T::Service: 'static, T::InitError: fmt::Debug, @@ -206,7 +209,7 @@ struct ServiceFactory { impl NewService for ServiceFactory where - T: NewService, + T: NewService, Response = ()>, T::Future: 'static, T::Service: 'static, T::Error: 'static, diff --git a/actix-server/src/services.rs b/actix-server/src/services.rs index 9ef42d40..91685b5d 100644 --- a/actix-server/src/services.rs +++ b/actix-server/src/services.rs @@ -1,14 +1,14 @@ -use std::net::{SocketAddr, TcpStream}; +use std::net::{self, SocketAddr}; use std::time::Duration; use actix_rt::spawn; -use actix_server_config::ServerConfig; +use actix_server_config::{Io, ServerConfig}; use actix_service::{NewService, Service}; use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; use log::error; use tokio_reactor::Handle; -use tokio_tcp::TcpStream as TokioTcpStream; +use tokio_tcp::TcpStream; use super::Token; use crate::counter::CounterGuard; @@ -16,7 +16,7 @@ use crate::counter::CounterGuard; /// Server message pub(crate) enum ServerMessage { /// New stream - Connect(TcpStream), + Connect(net::TcpStream), /// Gracefull shutdown Shutdown(Duration), /// Force shutdown @@ -24,7 +24,7 @@ pub(crate) enum ServerMessage { } pub trait ServiceFactory: Send + Clone + 'static { - type NewService: NewService; + type NewService: NewService>; fn create(&self) -> Self::NewService; } @@ -58,7 +58,7 @@ impl StreamService { impl Service for StreamService where - T: Service, + T: Service>, T::Future: 'static, T::Error: 'static, { @@ -74,13 +74,12 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { - let stream = - TokioTcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); + let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + }); if let Ok(stream) = stream { - spawn(self.service.call(stream).then(move |res| { + spawn(self.service.call(Io::new(stream)).then(move |res| { drop(guard); res.map_err(|_| ()).map(|_| ()) })); @@ -170,7 +169,7 @@ impl InternalServiceFactory for Box { impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, - T: NewService, + T: NewService>, { type NewService = T; diff --git a/actix-server/src/ssl/nativetls.rs b/actix-server/src/ssl/nativetls.rs index 6b177586..22d4bc56 100644 --- a/actix-server/src/ssl/nativetls.rs +++ b/actix-server/src/ssl/nativetls.rs @@ -1,7 +1,6 @@ use std::io; use std::marker::PhantomData; -use actix_server_config::ServerConfig; use actix_service::{NewService, Service}; use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use native_tls::{self, Error, HandshakeError, TlsAcceptor}; @@ -9,16 +8,17 @@ use tokio_io::{AsyncRead, AsyncWrite}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; +use crate::{Io, Protocol, ServerConfig}; /// Support `SSL` connections via native-tls package /// /// `tls` feature enables `NativeTlsAcceptor` type -pub struct NativeTlsAcceptor { +pub struct NativeTlsAcceptor { acceptor: TlsAcceptor, - io: PhantomData, + io: PhantomData<(T, P)>, } -impl NativeTlsAcceptor { +impl NativeTlsAcceptor { /// Create `NativeTlsAcceptor` instance pub fn new(acceptor: TlsAcceptor) -> Self { NativeTlsAcceptor { @@ -28,7 +28,7 @@ impl NativeTlsAcceptor { } } -impl Clone for NativeTlsAcceptor { +impl Clone for NativeTlsAcceptor { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), @@ -37,11 +37,11 @@ impl Clone for NativeTlsAcceptor { } } -impl NewService for NativeTlsAcceptor { - type Request = T; - type Response = TlsStream; +impl NewService for NativeTlsAcceptor { + type Request = Io; + type Response = Io, P>; type Error = Error; - type Service = NativeTlsAcceptorService; + type Service = NativeTlsAcceptorService; type InitError = (); type Future = FutureResult; @@ -58,17 +58,17 @@ impl NewService for NativeTlsAcceptor { +pub struct NativeTlsAcceptorService { acceptor: TlsAcceptor, - io: PhantomData, + io: PhantomData<(T, P)>, conns: Counter, } -impl Service for NativeTlsAcceptorService { - type Request = T; - type Response = TlsStream; +impl Service for NativeTlsAcceptorService { + type Request = Io; + type Response = Io, P>; type Error = Error; - type Future = Accept; + type Future = Accept; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.conns.available() { @@ -78,10 +78,12 @@ impl Service for NativeTlsAcceptorService { } } - fn call(&mut self, req: T) -> Self::Future { + fn call(&mut self, req: Self::Request) -> Self::Future { + let (io, params, _) = req.into_parts(); Accept { _guard: self.conns.get(), - inner: Some(self.acceptor.accept(req)), + inner: Some(self.acceptor.accept(io)), + params: Some(params), } } } @@ -100,21 +102,30 @@ pub struct TlsStream { /// Future returned from `NativeTlsAcceptor::accept` which will resolve /// once the accept handshake has finished. -pub struct Accept { +pub struct Accept { inner: Option, HandshakeError>>, + params: Option

, _guard: CounterGuard, } -impl Future for Accept { - type Item = TlsStream; +impl Future for Accept { + type Item = Io, P>; type Error = Error; fn poll(&mut self) -> Poll { match self.inner.take().expect("cannot poll MidHandshake twice") { - Ok(stream) => Ok(TlsStream { inner: stream }.into()), + Ok(stream) => Ok(Async::Ready(Io::from_parts( + TlsStream { inner: stream }, + self.params.take().unwrap(), + Protocol::Unknown, + ))), Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::WouldBlock(s)) => match s.handshake() { - Ok(stream) => Ok(TlsStream { inner: stream }.into()), + Ok(stream) => Ok(Async::Ready(Io::from_parts( + TlsStream { inner: stream }, + self.params.take().unwrap(), + Protocol::Unknown, + ))), Err(HandshakeError::Failure(e)) => Err(e), Err(HandshakeError::WouldBlock(s)) => { self.inner = Some(Err(HandshakeError::WouldBlock(s))); diff --git a/actix-server/src/ssl/openssl.rs b/actix-server/src/ssl/openssl.rs index 5c464dd7..62141be5 100644 --- a/actix-server/src/ssl/openssl.rs +++ b/actix-server/src/ssl/openssl.rs @@ -8,17 +8,17 @@ use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; -use crate::ServerConfig; +use crate::{Io, Protocol, ServerConfig}; /// Support `SSL` connections via openssl package /// /// `ssl` feature enables `OpensslAcceptor` type -pub struct OpensslAcceptor { +pub struct OpensslAcceptor { acceptor: SslAcceptor, - io: PhantomData, + io: PhantomData<(T, P)>, } -impl OpensslAcceptor { +impl OpensslAcceptor { /// Create default `OpensslAcceptor` pub fn new(acceptor: SslAcceptor) -> Self { OpensslAcceptor { @@ -28,7 +28,7 @@ impl OpensslAcceptor { } } -impl Clone for OpensslAcceptor { +impl Clone for OpensslAcceptor { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), @@ -37,11 +37,11 @@ impl Clone for OpensslAcceptor { } } -impl NewService for OpensslAcceptor { - type Request = T; - type Response = SslStream; +impl NewService for OpensslAcceptor { + type Request = Io; + type Response = Io, P>; type Error = HandshakeError; - type Service = OpensslAcceptorService; + type Service = OpensslAcceptorService; type InitError = (); type Future = FutureResult; @@ -58,17 +58,17 @@ impl NewService for OpensslAcceptor } } -pub struct OpensslAcceptorService { +pub struct OpensslAcceptorService { acceptor: SslAcceptor, - io: PhantomData, conns: Counter, + io: PhantomData<(T, P)>, } -impl Service for OpensslAcceptorService { - type Request = T; - type Response = SslStream; +impl Service for OpensslAcceptorService { + type Request = Io; + type Response = Io, P>; type Error = HandshakeError; - type Future = OpensslAcceptorServiceFut; + type Future = OpensslAcceptorServiceFut; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.conns.available() { @@ -78,27 +78,52 @@ impl Service for OpensslAcceptorService { } } - fn call(&mut self, req: T) -> Self::Future { + fn call(&mut self, req: Self::Request) -> Self::Future { + let (io, params, _) = req.into_parts(); OpensslAcceptorServiceFut { _guard: self.conns.get(), - fut: SslAcceptorExt::accept_async(&self.acceptor, req), + fut: SslAcceptorExt::accept_async(&self.acceptor, io), + params: Some(params), } } } -pub struct OpensslAcceptorServiceFut +pub struct OpensslAcceptorServiceFut where T: AsyncRead + AsyncWrite, { fut: AcceptAsync, + params: Option

, _guard: CounterGuard, } -impl Future for OpensslAcceptorServiceFut { - type Item = SslStream; +impl Future for OpensslAcceptorServiceFut { + type Item = Io, P>; type Error = HandshakeError; fn poll(&mut self) -> Poll { - self.fut.poll() + let io = futures::try_ready!(self.fut.poll()); + let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() { + const H2: &[u8] = b"\x02h2"; + const HTTP10: &[u8] = b"\x08http/1.0"; + const HTTP11: &[u8] = b"\x08http/1.1"; + + if protos.windows(3).any(|window| window == H2) { + Protocol::Http2 + } else if protos.windows(9).any(|window| window == HTTP11) { + Protocol::Http11 + } else if protos.windows(9).any(|window| window == HTTP10) { + Protocol::Http10 + } else { + Protocol::Unknown + } + } else { + Protocol::Unknown + }; + Ok(Async::Ready(Io::from_parts( + io, + self.params.take().unwrap(), + proto, + ))) } } diff --git a/actix-server/src/ssl/rustls.rs b/actix-server/src/ssl/rustls.rs index f824540b..bd85ffe5 100644 --- a/actix-server/src/ssl/rustls.rs +++ b/actix-server/src/ssl/rustls.rs @@ -10,17 +10,17 @@ use tokio_rustls::{Accept, TlsAcceptor, TlsStream}; use crate::counter::{Counter, CounterGuard}; use crate::ssl::MAX_CONN_COUNTER; -use crate::ServerConfig as SrvConfig; +use crate::{Io, Protocol, ServerConfig as SrvConfig}; /// Support `SSL` connections via rustls package /// /// `rust-tls` feature enables `RustlsAcceptor` type -pub struct RustlsAcceptor { +pub struct RustlsAcceptor { config: Arc, - io: PhantomData, + io: PhantomData<(T, P)>, } -impl RustlsAcceptor { +impl RustlsAcceptor { /// Create `RustlsAcceptor` new service pub fn new(config: ServerConfig) -> Self { RustlsAcceptor { @@ -30,7 +30,7 @@ impl RustlsAcceptor { } } -impl Clone for RustlsAcceptor { +impl Clone for RustlsAcceptor { fn clone(&self) -> Self { Self { config: self.config.clone(), @@ -39,11 +39,11 @@ impl Clone for RustlsAcceptor { } } -impl NewService for RustlsAcceptor { - type Request = T; - type Response = TlsStream; +impl NewService for RustlsAcceptor { + type Request = Io; + type Response = Io, P>; type Error = io::Error; - type Service = RustlsAcceptorService; + type Service = RustlsAcceptorService; type InitError = (); type Future = FutureResult; @@ -60,17 +60,17 @@ impl NewService for RustlsAcceptor { } } -pub struct RustlsAcceptorService { +pub struct RustlsAcceptorService { acceptor: TlsAcceptor, - io: PhantomData, + io: PhantomData<(T, P)>, conns: Counter, } -impl Service for RustlsAcceptorService { - type Request = T; - type Response = TlsStream; +impl Service for RustlsAcceptorService { + type Request = Io; + type Response = Io, P>; type Error = io::Error; - type Future = RustlsAcceptorServiceFut; + type Future = RustlsAcceptorServiceFut; fn poll_ready(&mut self) -> Poll<(), Self::Error> { if self.conns.available() { @@ -80,27 +80,35 @@ impl Service for RustlsAcceptorService { } } - fn call(&mut self, req: T) -> Self::Future { + fn call(&mut self, req: Self::Request) -> Self::Future { + let (io, params, _) = req.into_parts(); RustlsAcceptorServiceFut { _guard: self.conns.get(), - fut: self.acceptor.accept(req), + fut: self.acceptor.accept(io), + params: Some(params), } } } -pub struct RustlsAcceptorServiceFut +pub struct RustlsAcceptorServiceFut where T: AsyncRead + AsyncWrite, { fut: Accept, + params: Option

, _guard: CounterGuard, } -impl Future for RustlsAcceptorServiceFut { - type Item = TlsStream; +impl Future for RustlsAcceptorServiceFut { + type Item = Io, P>; type Error = io::Error; fn poll(&mut self) -> Poll { - self.fut.poll() + let io = futures::try_ready!(self.fut.poll()); + Ok(Async::Ready(Io::from_parts( + io, + self.params.take().unwrap(), + Protocol::Unknown, + ))) } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index ad4d3699..d85cca7d 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,3 +1,4 @@ +use std::sync::mpsc; use std::{net, thread, time}; use actix_server::{Server, ServerConfig}; @@ -68,3 +69,52 @@ fn test_listen() { thread::sleep(time::Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); } + +#[test] +#[cfg(unix)] +fn test_start() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + let sys = actix_rt::System::new("test"); + + let srv = Server::build() + .backlog(1) + .bind("test", addr, move || { + fn_cfg_factory(move |cfg: &ServerConfig| { + assert_eq!(cfg.local_addr(), addr); + Ok::<_, ()>((|_| Ok::<_, ()>(())).into_service()) + }) + }) + .unwrap() + .start(); + + let _ = tx.send((srv, actix_rt::System::current())); + let _ = sys.run(); + }); + let (srv, sys) = rx.recv().unwrap(); + thread::sleep(time::Duration::from_millis(400)); + + assert!(net::TcpStream::connect(addr).is_ok()); + + // pause + let _ = srv.pause(); + thread::sleep(time::Duration::from_millis(100)); + assert!(net::TcpStream::connect_timeout(&addr, time::Duration::from_millis(100)).is_ok()); + assert!(net::TcpStream::connect_timeout(&addr, time::Duration::from_millis(100)).is_err()); + + // resume + let _ = srv.resume(); + thread::sleep(time::Duration::from_millis(100)); + assert!(net::TcpStream::connect(addr).is_ok()); + assert!(net::TcpStream::connect(addr).is_ok()); + assert!(net::TcpStream::connect(addr).is_ok()); + + // stop + let _ = srv.stop(false); + thread::sleep(time::Duration::from_millis(100)); + assert!(net::TcpStream::connect(addr).is_err()); + + let _ = sys.stop(); +}