From a5a026b5c4c69ab212d6a79557038c1b3a624dc4 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 23 Aug 2018 15:42:34 -0700 Subject: [PATCH] remove custom NewService and cleanups --- examples/basic.rs | 2 +- examples/ssl.rs | 18 ++++--- src/lib.rs | 10 ++-- src/server.rs | 98 ++++++++--------------------------- src/server_config.rs | 83 ----------------------------- src/server_service.rs | 33 ++++++------ src/service.rs | 118 ++++++++++++------------------------------ src/ssl/openssl.rs | 18 ++----- src/worker.rs | 4 +- 9 files changed, 92 insertions(+), 292 deletions(-) delete mode 100644 src/server_config.rs diff --git a/examples/basic.rs b/examples/basic.rs index 5dd95b40..e7c92c08 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -20,7 +20,7 @@ use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::SslAcceptorExt; -use actix_net::{IntoNewService, NewService, Server}; +use actix_net::{IntoNewService, NewServiceExt, Server}; /// Simple logger service, it just prints fact of the new connections fn logger( diff --git a/examples/ssl.rs b/examples/ssl.rs index 25ec9551..f95ff83a 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -15,7 +15,7 @@ use futures::{future, Future}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio_io::{AsyncRead, AsyncWrite}; -use actix_net::{ssl, NewService, Server}; +use actix_net::{ssl, NewServiceExt, Server}; #[derive(Debug)] struct ServiceState { @@ -46,14 +46,16 @@ fn main() { let openssl = ssl::OpensslService::new(builder); // server start mutiple workers, it runs supplied `Fn` in each worker. - Server::default().bind("0.0.0.0:8443", move || { - let num = num.clone(); + Server::default() + .bind("0.0.0.0:8443", move || { + let num = num.clone(); - // configure service - openssl.clone().and_then((service, move || { - Ok::<_, io::Error>(ServiceState { num: num.clone() }) - })) - }).unwrap().start(); + // configure service + openssl.clone().and_then((service, move || { + Ok::<_, io::Error>(ServiceState { num: num.clone() }) + })) + }).unwrap() + .start(); sys.run(); } diff --git a/src/lib.rs b/src/lib.rs index 2aa42cf5..dea4d56e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,20 +45,18 @@ extern crate webpki_roots; use actix::Message; -/// re-export for convinience. as a note, actix-net does not use `tower_service::NewService` trait. -pub use tower_service::Service; +/// re-export for convinience +pub use tower_service::{NewService, Service}; pub(crate) mod accept; mod server; -pub mod server_config; mod server_service; pub mod service; pub mod ssl; mod worker; pub use server::Server; -pub use server_config::Config; -pub use service::{IntoNewService, IntoService, NewService}; +pub use service::{IntoNewService, IntoService, NewServiceExt}; /// Pause accepting incoming connections /// @@ -85,4 +83,4 @@ impl Message for StopServer { /// Socket id token #[derive(Clone, Copy)] -pub(crate) struct Token(usize); \ No newline at end of file +pub(crate) struct Token(usize); diff --git a/src/server.rs b/src/server.rs index 6c38c206..2541b3a1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,10 +17,9 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::server_config::{Config, ServerConfig}; use super::server_service::{ServerNewService, ServerServiceFactory}; -use super::service::NewService; use super::worker::{Conn, StopWorker, Worker, WorkerClient}; +use super::NewService; use super::{PauseServer, ResumeServer, StopServer, Token}; pub(crate) enum ServerCommand { @@ -28,11 +27,10 @@ pub(crate) enum ServerCommand { } /// Server -pub struct Server { - config: C, +pub struct Server { threads: usize, workers: Vec<(usize, Addr)>, - services: Vec + Send>>, + services: Vec>, sockets: Vec<(Token, net::TcpListener)>, accept: AcceptLoop, exit: bool, @@ -43,17 +41,16 @@ pub struct Server { maxconnrate: usize, } -impl Default for Server { +impl Default for Server { fn default() -> Self { - Self::new(ServerConfig::default()) + Self::new() } } -impl Server { +impl Server { /// Create new Server instance - pub fn new(config: C) -> Server { + pub fn new() -> Server { Server { - config, threads: num_cpus::get(), workers: Vec::new(), services: Vec::new(), @@ -83,10 +80,7 @@ impl Server { /// reached for each worker. /// /// By default max connections is set to a 100k. - pub fn maxconn(mut self, num: usize) -> Self - where - C: AsMut, - { + pub fn maxconn(mut self, num: usize) -> Self { self.maxconn = num; self } @@ -140,7 +134,7 @@ impl Server { /// /// This function is useful for moving parts of configuration to a /// different module or event library. - /// + /// /// ```rust /// # extern crate actix_web; /// use actix_web::{fs, middleware, App, HttpResponse}; @@ -160,9 +154,9 @@ impl Server { /// .handler("/static", fs::StaticFiles::new(".").unwrap()); /// } /// ``` - pub fn configure(self, cfg: F) -> Server + pub fn configure(self, cfg: F) -> Server where - F: Fn(Server) -> Server, + F: Fn(Server) -> Server, { cfg(self) } @@ -172,7 +166,7 @@ impl Server { where F: Fn() -> N + Clone + Send + 'static, U: net::ToSocketAddrs, - N: NewService + 'static, + N: NewService + 'static, N::Service: 'static, N::Future: 'static, N::Error: fmt::Display, @@ -189,13 +183,13 @@ impl Server { pub fn listen(mut self, lst: net::TcpListener, factory: F) -> Self where F: Fn() -> N + Clone + Send + 'static, - N: NewService + 'static, + N: NewService + 'static, N::Service: 'static, N::Future: 'static, N::Error: fmt::Display, { let token = Token(self.services.len()); - self.services.push(ServerNewService::create(factory, self.config.clone())); + self.services.push(ServerNewService::create(factory)); self.sockets.push((token, lst)); self } @@ -229,7 +223,7 @@ impl Server { } /// Starts Server Actor and returns its address - pub fn start(mut self) -> Addr> { + pub fn start(mut self) -> Addr { if self.sockets.is_empty() { panic!("Service should have at least one bound socket"); } else { @@ -281,7 +275,7 @@ impl Server { let (tx, rx) = unbounded::(); let conns = Connections::new(notify, self.maxconn, self.maxconnrate); let worker = WorkerClient::new(idx, tx, conns.clone()); - let services: Vec + Send>> = + let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { @@ -293,14 +287,14 @@ impl Server { } } -impl Actor for Server { +impl Actor for Server { type Context = Context; } /// Signals support /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system /// message to `System` actor. -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { @@ -325,7 +319,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, _: PauseServer, _: &mut Context) { @@ -333,7 +327,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, _: ResumeServer, _: &mut Context) { @@ -341,7 +335,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { @@ -396,7 +390,7 @@ impl Handler for Server { } /// Commands from accept threads -impl StreamHandler for Server { +impl StreamHandler for Server { fn finished(&mut self, _: &mut Context) {} fn handle(&mut self, msg: ServerCommand, _: &mut Context) { @@ -461,16 +455,6 @@ impl Connections { pub(crate) fn available(&self) -> bool { self.0.available() } - - /// Report opened connection - pub fn connection(&self) -> ConnectionTag { - ConnectionTag::new(self.0.clone()) - } - - /// Report rate connection, rate is usually ssl handshake - pub fn connection_rate(&self) -> ConnectionRateTag { - ConnectionRateTag::new(self.0.clone()) - } } #[derive(Default)] @@ -506,44 +490,6 @@ impl ConnectionsInner { } } -/// Type responsible for max connection stat. -/// -/// Max connections stat get updated on drop. -pub struct ConnectionTag(Arc); - -impl ConnectionTag { - fn new(inner: Arc) -> Self { - inner.conn.fetch_add(1, Ordering::Relaxed); - ConnectionTag(inner) - } -} - -impl Drop for ConnectionTag { - fn drop(&mut self) { - let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed); - self.0.notify_maxconn(conn); - } -} - -/// Type responsible for max connection rate stat. -/// -/// Max connections rate stat get updated on drop. -pub struct ConnectionRateTag(Arc); - -impl ConnectionRateTag { - fn new(inner: Arc) -> Self { - inner.connrate.fetch_add(1, Ordering::Relaxed); - ConnectionRateTag(inner) - } -} - -impl Drop for ConnectionRateTag { - fn drop(&mut self) { - let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed); - self.0.notify_maxconnrate(connrate); - } -} - fn bind_addr(addr: S) -> io::Result> { let mut err = None; let mut succ = false; diff --git a/src/server_config.rs b/src/server_config.rs deleted file mode 100644 index 0d333c01..00000000 --- a/src/server_config.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! Default server config -use std::sync::{atomic::AtomicUsize, Arc}; - -pub trait Config: Send + Clone + Default + 'static { - fn fork(&self) -> Self { - self.clone() - } -} - -#[derive(Clone, Default)] -pub struct ServerConfig { - conn: ConnectionsConfig, - ssl: SslConfig, -} - -impl Config for ServerConfig { - fn fork(&self) -> Self { - ServerConfig { - conn: self.conn.fork(), - ssl: self.ssl.fork(), - } - } -} - -impl AsRef for ServerConfig { - fn as_ref(&self) -> &ConnectionsConfig { - &self.conn - } -} - -impl AsRef for ServerConfig { - fn as_ref(&self) -> &SslConfig { - &self.ssl - } -} - -#[derive(Clone)] -pub struct ConnectionsConfig { - max_connections: usize, - num_connections: Arc, -} - -impl Default for ConnectionsConfig { - fn default() -> Self { - ConnectionsConfig { - max_connections: 102_400, - num_connections: Arc::new(AtomicUsize::new(0)), - } - } -} - -impl Config for ConnectionsConfig { - fn fork(&self) -> Self { - ConnectionsConfig { - max_connections: self.max_connections, - num_connections: Arc::new(AtomicUsize::new(0)), - } - } -} - -#[derive(Clone)] -pub struct SslConfig { - max_handshakes: usize, - num: Arc, -} - -impl Default for SslConfig { - fn default() -> Self { - SslConfig { - max_handshakes: 256, - num: Arc::new(AtomicUsize::new(0)), - } - } -} - -impl Config for SslConfig { - fn fork(&self) -> Self { - SslConfig { - max_handshakes: self.max_handshakes, - num: Arc::new(AtomicUsize::new(0)), - } - } -} diff --git a/src/server_service.rs b/src/server_service.rs index 4d99c182..8bfe5b55 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -8,7 +8,7 @@ use futures::{future, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; -use super::{Config, NewService, Service}; +use super::{NewService, Service}; pub(crate) type BoxedServerService = Box< Service< @@ -56,42 +56,42 @@ where } } -pub(crate) struct ServerNewService where F: Fn() -> T + Send + Clone { +pub(crate) struct ServerNewService +where + F: Fn() -> T + Send + Clone, +{ inner: F, - config: C, counter: Arc, } -impl ServerNewService +impl ServerNewService where F: Fn() -> T + Send + Clone + 'static, - T: NewService + 'static, + T: NewService + 'static, T::Service: 'static, T::Future: 'static, T::Error: fmt::Display, { - pub(crate) fn create(inner: F, config: C) -> Box + Send> { + pub(crate) fn create(inner: F) -> Box { Box::new(Self { inner, - config, counter: Arc::new(AtomicUsize::new(0)), }) } } -pub trait ServerServiceFactory { +pub trait ServerServiceFactory { fn counter(&self) -> Arc; - fn clone_factory(&self) -> Box + Send>; + fn clone_factory(&self) -> Box; fn create(&self) -> Box>; } -impl ServerServiceFactory for ServerNewService +impl ServerServiceFactory for ServerNewService where F: Fn() -> T + Send + Clone + 'static, - T: NewService - + 'static, + T: NewService + 'static, T::Service: 'static, T::Future: 'static, T::Error: fmt::Display, @@ -100,10 +100,9 @@ where self.counter.clone() } - fn clone_factory(&self) -> Box + Send> { + fn clone_factory(&self) -> Box { Box::new(Self { inner: self.inner.clone(), - config: self.config.fork(), counter: Arc::new(AtomicUsize::new(0)), }) } @@ -112,7 +111,7 @@ where let counter = self.counter.clone(); Box::new( (self.inner)() - .new_service(self.config.clone()) + .new_service() .map_err(|_| ()) .map(move |inner| { let service: BoxedServerService = @@ -123,12 +122,12 @@ where } } -impl ServerServiceFactory for Box> { +impl ServerServiceFactory for Box { fn counter(&self) -> Arc { self.as_ref().counter() } - fn clone_factory(&self) -> Box + Send> { + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } diff --git a/src/service.rs b/src/service.rs index 18ec9cf0..5ead53ce 100644 --- a/src/service.rs +++ b/src/service.rs @@ -3,44 +3,9 @@ use std::marker; use std::rc::Rc; use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll}; -use tower_service::Service; - -/// Creates new `Service` values. -/// -/// Acts as a service factory. This is useful for cases where new `Service` -/// values must be produced. One case is a TCP servier listener. The listner -/// accepts new TCP streams, obtains a new `Service` value using the -/// `NewService` trait, and uses that new `Service` value to process inbound -/// requests on that new TCP stream. -pub trait NewService { - /// Requests handled by the service - type Request; - - /// Responses given by the service - type Response; - - /// Errors produced by the service - type Error; - - /// The `Service` value created by this factory - type Service: Service< - Request = Self::Request, - Response = Self::Response, - Error = Self::Error, - >; - - /// Pipeline configuration - type Config: Clone; - - /// Errors produced while building a service. - type InitError; - - /// The future of the `Service` instance. - type Future: Future; - - /// Create and return a new service value asynchronously. - fn new_service(&self, Self::Config) -> Self::Future; +use tower_service::{NewService, Service}; +pub trait NewServiceExt: NewService { fn and_then(self, new_service: F) -> AndThenNewService where Self: Sized, @@ -48,7 +13,6 @@ pub trait NewService { B: NewService< Request = Self::Response, Error = Self::Error, - Config = Self::Config, InitError = Self::InitError, >, { @@ -72,6 +36,8 @@ pub trait NewService { } } +impl NewServiceExt for T {} + /// Trait for types that can be converted to a Service pub trait IntoService where @@ -163,7 +129,7 @@ where } } -pub struct FnNewService +pub struct FnNewService where F: Fn(Req) -> Fut, Fut: IntoFuture, @@ -173,10 +139,9 @@ where resp: marker::PhantomData, err: marker::PhantomData, ierr: marker::PhantomData, - cfg: marker::PhantomData, } -impl FnNewService +impl FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, @@ -188,43 +153,39 @@ where resp: marker::PhantomData, err: marker::PhantomData, ierr: marker::PhantomData, - cfg: marker::PhantomData, } } } -impl NewService for FnNewService +impl NewService for FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, - Cfg: Clone, { type Request = Req; type Response = Resp; type Error = Err; type Service = FnService; - type Config = Cfg; type InitError = IErr; type Future = FutureResult; - fn new_service(&self, _: Cfg) -> Self::Future { + fn new_service(&self) -> Self::Future { future::ok(FnService::new(self.f.clone())) } } -impl IntoNewService> +impl IntoNewService> for F where F: Fn(Req) -> Fut + Clone + 'static, Fut: IntoFuture, - Cfg: Clone, { - fn into_new_service(self) -> FnNewService { + fn into_new_service(self) -> FnNewService { FnNewService::new(self) } } -impl Clone for FnNewService +impl Clone for FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, @@ -282,7 +243,7 @@ where } /// `NewService` for state and handler functions -pub struct FnStateNewService { +pub struct FnStateNewService { f: F1, state: F2, s: marker::PhantomData, @@ -292,11 +253,10 @@ pub struct FnStateNewService err2: marker::PhantomData, fut1: marker::PhantomData, fut2: marker::PhantomData, - cfg: marker::PhantomData, } -impl - FnStateNewService +impl + FnStateNewService { fn new(f: F1, state: F2) -> Self { FnStateNewService { @@ -309,13 +269,12 @@ impl err2: marker::PhantomData, fut1: marker::PhantomData, fut2: marker::PhantomData, - cfg: marker::PhantomData, } } } -impl NewService - for FnStateNewService +impl NewService + for FnStateNewService where S: 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, @@ -326,17 +285,15 @@ where Resp: 'static, Err1: 'static, Err2: 'static, - Cfg: Clone, { type Request = Req; type Response = Resp; type Error = Err1; type Service = FnStateService; - type Config = Cfg; type InitError = Err2; type Future = Box>; - fn new_service(&self, _: Cfg) -> Self::Future { + fn new_service(&self) -> Self::Future { let f = self.f.clone(); Box::new( (self.state)() @@ -346,9 +303,8 @@ where } } -impl - IntoNewService> - for (F1, F2) +impl + IntoNewService> for (F1, F2) where S: 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, @@ -359,17 +315,16 @@ where Resp: 'static, Err1: 'static, Err2: 'static, - Cfg: Clone, { fn into_new_service( self, - ) -> FnStateNewService { + ) -> FnStateNewService { FnStateNewService::new(self.0, self.1) } } -impl Clone - for FnStateNewService +impl Clone + for FnStateNewService where F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, F2: Fn() -> Fut2 + Clone, @@ -415,11 +370,9 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { match self.a.poll_ready() { - Ok(Async::Ready(_)) => { - self.b.borrow_mut().poll_ready() - }, + Ok(Async::Ready(_)) => self.b.borrow_mut().poll_ready(), Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()) + Err(err) => Err(err.into()), } } @@ -474,7 +427,7 @@ where self.poll() } Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err.into()) + Err(err) => Err(err.into()), } } } @@ -501,11 +454,7 @@ where impl NewService for AndThenNewService where - A: NewService< - Response = B::Request, - Config = B::Config, - InitError = B::InitError, - >, + A: NewService, A::Error: Into, B: NewService, { @@ -514,12 +463,11 @@ where type Error = B::Error; type Service = AndThen; - type Config = A::Config; type InitError = A::InitError; type Future = AndThenNewServiceFuture; - fn new_service(&self, cfg: A::Config) -> Self::Future { - AndThenNewServiceFuture::new(self.a.new_service(cfg.clone()), self.b.new_service(cfg)) + fn new_service(&self) -> Self::Future { + AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) } } @@ -712,12 +660,11 @@ where type Error = E; type Service = MapErr; - type Config = A::Config; type InitError = A::InitError; type Future = MapErrNewServiceFuture; - fn new_service(&self, cfg: Self::Config) -> Self::Future { - MapErrNewServiceFuture::new(self.a.new_service(cfg), self.f.clone()) + fn new_service(&self) -> Self::Future { + MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone()) } } @@ -803,12 +750,11 @@ where type Error = A::Error; type Service = A::Service; - type Config = A::Config; type InitError = E; type Future = MapInitErrFuture; - fn new_service(&self, cfg: Self::Config) -> Self::Future { - MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) + fn new_service(&self) -> Self::Future { + MapInitErrFuture::new(self.a.new_service(), self.f.clone()) } } diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index b6496b68..f4a33950 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -7,25 +7,22 @@ use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; -use server_config::SslConfig; use {NewService, Service}; /// Support `SSL` connections via openssl package /// /// `alpn` feature enables `OpensslAcceptor` type -pub struct OpensslService { +pub struct OpensslService { acceptor: SslAcceptor, io: PhantomData, - cfg: PhantomData, } -impl OpensslService { +impl OpensslService { /// Create default `OpensslService` pub fn new(builder: SslAcceptorBuilder) -> Self { OpensslService { acceptor: builder.build(), io: PhantomData, - cfg: PhantomData, } } @@ -46,32 +43,27 @@ impl OpensslService { Ok(OpensslService { acceptor: builder.build(), io: PhantomData, - cfg: PhantomData, }) } } -impl Clone for OpensslService { +impl Clone for OpensslService { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), io: PhantomData, - cfg: PhantomData, } } } -impl> NewService - for OpensslService -{ +impl NewService for OpensslService { type Request = T; type Response = SslStream; type Error = io::Error; type Service = OpensslAcceptor; - type Config = Cfg; type InitError = io::Error; type Future = FutureResult; - fn new_service(&self, _: Self::Config) -> Self::Future { + fn new_service(&self) -> Self::Future { future::ok(OpensslAcceptor { acceptor: self.acceptor.clone(), io: PhantomData, diff --git a/src/worker.rs b/src/worker.rs index d2397214..80bf1356 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -69,8 +69,8 @@ impl Actor for Worker { } impl Worker { - pub(crate) fn new( - ctx: &mut Context, services: Vec + Send>>, + pub(crate) fn new( + ctx: &mut Context, services: Vec>, ) -> Self { let wrk = Worker { services: Vec::new(),