From 552d19a0eb2e7f71460b2b89d0dd7dfd128f1d36 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 8 Sep 2018 14:50:16 -0700 Subject: [PATCH] add generic server service factory --- examples/basic.rs | 10 ++++----- examples/ssl.rs | 11 ++++------ src/server.rs | 24 ++++++++------------ src/server_service.rs | 51 +++++++++++++++++++++++++------------------ src/worker.rs | 4 ++-- 5 files changed, 50 insertions(+), 50 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index 4e2039ca..5bbd529c 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -9,11 +9,11 @@ extern crate tokio_io; extern crate tokio_openssl; extern crate tokio_tcp; +use std::fmt; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::{fmt, io}; use futures::{future, Future}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; @@ -25,7 +25,7 @@ use actix_net::{IntoNewService, NewServiceExt, Server}; /// Simple logger service, it just prints fact of the new connections fn logger( stream: T, -) -> impl Future { +) -> impl Future { println!("New connection: {:?}", stream); future::ok(stream) } @@ -40,7 +40,7 @@ struct ServiceState { /// Service function for our stateful service fn service( st: &mut ServiceState, _stream: T, -) -> impl Future { +) -> impl Future { let num = st.num.fetch_add(1, Ordering::Relaxed); println!("got ssl connection {:?}", num); future::ok(()) @@ -75,7 +75,7 @@ fn main() { // service for converting incoming TcpStream to a SslStream (move |stream| { SslAcceptorExt::accept_async(&acceptor, stream) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .map_err(|e| println!("Openssl error: {}", e)) }) // convert closure to a `NewService` .into_new_service() @@ -89,7 +89,7 @@ fn main() { // actix-net generates `NewService` impl that creates `ServiceState` instance for each new service // and use `service` function as `Service::call` .and_then((service, move || { - Ok::<_, io::Error>(ServiceState { num: num.clone() }) + Ok(ServiceState { num: num.clone() }) })) }, ).unwrap() diff --git a/examples/ssl.rs b/examples/ssl.rs index 04ebe22d..0f8043d1 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -5,7 +5,6 @@ extern crate openssl; extern crate tokio_io; extern crate tokio_tcp; -use std::io; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -24,7 +23,7 @@ struct ServiceState { fn service( st: &mut ServiceState, _: T, -) -> impl Future { +) -> impl Future { let num = st.num.fetch_add(1, Ordering::Relaxed); println!("got ssl connection {:?}", num); future::ok(()) @@ -43,7 +42,7 @@ fn main() { .unwrap(); let num = Arc::new(AtomicUsize::new(0)); - let openssl = ssl::OpensslAcceptor::new(builder); + let openssl = ssl::OpensslAcceptor::new(builder.build()); // server start mutiple workers, it runs supplied `Fn` in each worker. Server::default() @@ -53,10 +52,8 @@ fn main() { // configure service openssl .clone() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .and_then((service, move || { - Ok::<_, io::Error>(ServiceState { num: num.clone() }) - })) + .map_err(|e| println!("Openssl error: {}", e)) + .and_then((service, move || Ok(ServiceState { num: num.clone() }))) }).unwrap() .start(); diff --git a/src/server.rs b/src/server.rs index 9bc8774a..a968efa9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,17 +5,17 @@ use futures::sync::{mpsc, mpsc::unbounded}; use futures::{Future, Sink, Stream}; use net2::TcpBuilder; use num_cpus; -use tokio_tcp::TcpStream; use actix::{ actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler, System, WrapFuture, }; +pub use super::server_service::ServerServiceFactory; + use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::server_service::{ServerNewService, ServerServiceFactory}; +use super::server_service::{InternalServerServiceFactory, ServerNewService}; use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; -use super::NewService; use super::{PauseServer, ResumeServer, StopServer, Token}; pub(crate) enum ServerCommand { @@ -26,7 +26,7 @@ pub(crate) enum ServerCommand { pub struct Server { threads: usize, workers: Vec<(usize, Addr)>, - services: Vec>, + services: Vec>, sockets: Vec<(Token, net::TcpListener)>, accept: AcceptLoop, exit: bool, @@ -143,13 +143,10 @@ impl Server { } /// Add new service to server - pub fn bind(mut self, addr: U, factory: F) -> io::Result + pub fn bind(mut self, addr: U, factory: F) -> io::Result where - F: Fn() -> N + Clone + Send + 'static, + F: ServerServiceFactory, U: net::ToSocketAddrs, - N: NewService + 'static, - N::Service: 'static, - N::Future: 'static, { let sockets = bind_addr(addr)?; @@ -160,12 +157,9 @@ impl Server { } /// Add new service to server - pub fn listen(mut self, lst: net::TcpListener, factory: F) -> Self + pub fn listen(mut self, lst: net::TcpListener, factory: F) -> Self where - F: Fn() -> N + Clone + Send + 'static, - N: NewService + 'static, - N::Service: 'static, - N::Future: 'static, + F: ServerServiceFactory, { let token = Token(self.services.len()); self.services.push(ServerNewService::create(factory)); @@ -254,7 +248,7 @@ impl Server { let (tx, rx) = unbounded::(); let avail = WorkerAvailability::new(notify); let worker = WorkerClient::new(idx, tx, avail.clone()); - let services: Vec> = + let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { diff --git a/src/server_service.rs b/src/server_service.rs index 0bfad8df..0f197d82 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -65,54 +65,45 @@ where } } -pub(crate) struct ServerNewService -where - F: Fn() -> T + Send + Clone, -{ +pub(crate) struct ServerNewService { inner: F, } -impl ServerNewService +impl ServerNewService where - F: Fn() -> T + Send + Clone + 'static, - T: NewService + 'static, - T::Service: 'static, - T::Future: 'static, + F: ServerServiceFactory, { - pub(crate) fn create(inner: F) -> Box { + pub(crate) fn create(inner: F) -> Box { Box::new(Self { inner }) } } -pub trait ServerServiceFactory { - fn clone_factory(&self) -> Box; +pub(crate) trait InternalServerServiceFactory: Send { + fn clone_factory(&self) -> Box; fn create(&self) -> Box>; } -impl ServerServiceFactory for ServerNewService +impl InternalServerServiceFactory for ServerNewService where - F: Fn() -> T + Send + Clone + 'static, - T: NewService + 'static, - T::Service: 'static, - T::Future: 'static, + F: ServerServiceFactory, { - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { inner: self.inner.clone(), }) } fn create(&self) -> Box> { - Box::new((self.inner)().new_service().map(move |inner| { + Box::new(self.inner.create().new_service().map(move |inner| { let service: BoxedServerService = Box::new(ServerService::new(inner)); service })) } } -impl ServerServiceFactory for Box { - fn clone_factory(&self) -> Box { +impl InternalServerServiceFactory for Box { + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } @@ -120,3 +111,21 @@ impl ServerServiceFactory for Box { self.as_ref().create() } } + +pub trait ServerServiceFactory: Send + Clone + 'static { + type NewService: NewService; + + fn create(&self) -> Self::NewService; +} + +impl ServerServiceFactory for F +where + F: Fn() -> T + Send + Clone + 'static, + T: NewService, +{ + type NewService = T; + + fn create(&self) -> T { + (self)() + } +} diff --git a/src/worker.rs b/src/worker.rs index 8d4bcac0..10354a26 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -17,7 +17,7 @@ use actix::{ }; use super::accept::AcceptNotify; -use super::server_service::{BoxedServerService, ServerMessage, ServerServiceFactory}; +use super::server_service::{BoxedServerService, InternalServerServiceFactory, ServerMessage}; use super::Token; #[derive(Message)] @@ -122,7 +122,7 @@ impl Actor for Worker { impl Worker { pub(crate) fn new( - ctx: &mut Context, services: Vec>, + ctx: &mut Context, services: Vec>, availability: WorkerAvailability, ) -> Self { let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {