1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-12 15:00:22 +01:00

add generic server service factory

This commit is contained in:
Nikolay Kim 2018-09-08 14:50:16 -07:00
parent 4264574af1
commit 552d19a0eb
5 changed files with 50 additions and 50 deletions

View File

@ -9,11 +9,11 @@ extern crate tokio_io;
extern crate tokio_openssl; extern crate tokio_openssl;
extern crate tokio_tcp; extern crate tokio_tcp;
use std::fmt;
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}; };
use std::{fmt, io};
use futures::{future, Future}; use futures::{future, Future};
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
@ -25,7 +25,7 @@ use actix_net::{IntoNewService, NewServiceExt, Server};
/// Simple logger service, it just prints fact of the new connections /// Simple logger service, it just prints fact of the new connections
fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>( fn logger<T: AsyncRead + AsyncWrite + fmt::Debug>(
stream: T, stream: T,
) -> impl Future<Item = T, Error = io::Error> { ) -> impl Future<Item = T, Error = ()> {
println!("New connection: {:?}", stream); println!("New connection: {:?}", stream);
future::ok(stream) future::ok(stream)
} }
@ -40,7 +40,7 @@ struct ServiceState {
/// Service function for our stateful service /// Service function for our stateful service
fn service<T: AsyncRead + AsyncWrite>( fn service<T: AsyncRead + AsyncWrite>(
st: &mut ServiceState, _stream: T, st: &mut ServiceState, _stream: T,
) -> impl Future<Item = (), Error = io::Error> { ) -> impl Future<Item = (), Error = ()> {
let num = st.num.fetch_add(1, Ordering::Relaxed); let num = st.num.fetch_add(1, Ordering::Relaxed);
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
future::ok(()) future::ok(())
@ -75,7 +75,7 @@ fn main() {
// service for converting incoming TcpStream to a SslStream<TcpStream> // service for converting incoming TcpStream to a SslStream<TcpStream>
(move |stream| { (move |stream| {
SslAcceptorExt::accept_async(&acceptor, stream) SslAcceptorExt::accept_async(&acceptor, stream)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| println!("Openssl error: {}", e))
}) })
// convert closure to a `NewService` // convert closure to a `NewService`
.into_new_service() .into_new_service()
@ -89,7 +89,7 @@ fn main() {
// actix-net generates `NewService` impl that creates `ServiceState` instance for each new service // actix-net generates `NewService` impl that creates `ServiceState` instance for each new service
// and use `service` function as `Service::call` // and use `service` function as `Service::call`
.and_then((service, move || { .and_then((service, move || {
Ok::<_, io::Error>(ServiceState { num: num.clone() }) Ok(ServiceState { num: num.clone() })
})) }))
}, },
).unwrap() ).unwrap()

View File

@ -5,7 +5,6 @@ extern crate openssl;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_tcp; extern crate tokio_tcp;
use std::io;
use std::sync::{ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -24,7 +23,7 @@ struct ServiceState {
fn service<T: AsyncRead + AsyncWrite>( fn service<T: AsyncRead + AsyncWrite>(
st: &mut ServiceState, _: T, st: &mut ServiceState, _: T,
) -> impl Future<Item = (), Error = io::Error> { ) -> impl Future<Item = (), Error = ()> {
let num = st.num.fetch_add(1, Ordering::Relaxed); let num = st.num.fetch_add(1, Ordering::Relaxed);
println!("got ssl connection {:?}", num); println!("got ssl connection {:?}", num);
future::ok(()) future::ok(())
@ -43,7 +42,7 @@ fn main() {
.unwrap(); .unwrap();
let num = Arc::new(AtomicUsize::new(0)); let num = Arc::new(AtomicUsize::new(0));
let openssl = ssl::OpensslAcceptor::new(builder); let openssl = ssl::OpensslAcceptor::new(builder.build());
// server start mutiple workers, it runs supplied `Fn` in each worker. // server start mutiple workers, it runs supplied `Fn` in each worker.
Server::default() Server::default()
@ -53,10 +52,8 @@ fn main() {
// configure service // configure service
openssl openssl
.clone() .clone()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| println!("Openssl error: {}", e))
.and_then((service, move || { .and_then((service, move || Ok(ServiceState { num: num.clone() })))
Ok::<_, io::Error>(ServiceState { num: num.clone() })
}))
}).unwrap() }).unwrap()
.start(); .start();

View File

@ -5,17 +5,17 @@ use futures::sync::{mpsc, mpsc::unbounded};
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use net2::TcpBuilder; use net2::TcpBuilder;
use num_cpus; use num_cpus;
use tokio_tcp::TcpStream;
use actix::{ use actix::{
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler,
Response, StreamHandler, System, WrapFuture, Response, StreamHandler, System, WrapFuture,
}; };
pub use super::server_service::ServerServiceFactory;
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::server_service::{ServerNewService, ServerServiceFactory}; use super::server_service::{InternalServerServiceFactory, ServerNewService};
use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient};
use super::NewService;
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::{PauseServer, ResumeServer, StopServer, Token};
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
@ -26,7 +26,7 @@ pub(crate) enum ServerCommand {
pub struct Server { pub struct Server {
threads: usize, threads: usize,
workers: Vec<(usize, Addr<Worker>)>, workers: Vec<(usize, Addr<Worker>)>,
services: Vec<Box<ServerServiceFactory + Send>>, services: Vec<Box<InternalServerServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
@ -143,13 +143,10 @@ impl Server {
} }
/// Add new service to server /// Add new service to server
pub fn bind<F, U, N>(mut self, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U>(mut self, addr: U, factory: F) -> io::Result<Self>
where where
F: Fn() -> N + Clone + Send + 'static, F: ServerServiceFactory,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
N: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()> + 'static,
N::Service: 'static,
N::Future: 'static,
{ {
let sockets = bind_addr(addr)?; let sockets = bind_addr(addr)?;
@ -160,12 +157,9 @@ impl Server {
} }
/// Add new service to server /// Add new service to server
pub fn listen<F, N>(mut self, lst: net::TcpListener, factory: F) -> Self pub fn listen<F>(mut self, lst: net::TcpListener, factory: F) -> Self
where where
F: Fn() -> N + Clone + Send + 'static, F: ServerServiceFactory,
N: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()> + 'static,
N::Service: 'static,
N::Future: 'static,
{ {
let token = Token(self.services.len()); let token = Token(self.services.len());
self.services.push(ServerNewService::create(factory)); self.services.push(ServerNewService::create(factory));
@ -254,7 +248,7 @@ impl Server {
let (tx, rx) = unbounded::<Conn>(); let (tx, rx) = unbounded::<Conn>();
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx, avail.clone()); let worker = WorkerClient::new(idx, tx, avail.clone());
let services: Vec<Box<ServerServiceFactory + Send>> = let services: Vec<Box<InternalServerServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| { let addr = Arbiter::start(move |ctx: &mut Context<_>| {

View File

@ -65,54 +65,45 @@ where
} }
} }
pub(crate) struct ServerNewService<F, T> pub(crate) struct ServerNewService<F: ServerServiceFactory> {
where
F: Fn() -> T + Send + Clone,
{
inner: F, inner: F,
} }
impl<F, T> ServerNewService<F, T> impl<F> ServerNewService<F>
where where
F: Fn() -> T + Send + Clone + 'static, F: ServerServiceFactory,
T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()> + 'static,
T::Service: 'static,
T::Future: 'static,
{ {
pub(crate) fn create(inner: F) -> Box<ServerServiceFactory + Send> { pub(crate) fn create(inner: F) -> Box<InternalServerServiceFactory> {
Box::new(Self { inner }) Box::new(Self { inner })
} }
} }
pub trait ServerServiceFactory { pub(crate) trait InternalServerServiceFactory: Send {
fn clone_factory(&self) -> Box<ServerServiceFactory + Send>; fn clone_factory(&self) -> Box<InternalServerServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>; fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
} }
impl<F, T> ServerServiceFactory for ServerNewService<F, T> impl<F> InternalServerServiceFactory for ServerNewService<F>
where where
F: Fn() -> T + Send + Clone + 'static, F: ServerServiceFactory,
T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()> + 'static,
T::Service: 'static,
T::Future: 'static,
{ {
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
Box::new(Self { Box::new(Self {
inner: self.inner.clone(), inner: self.inner.clone(),
}) })
} }
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> { fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
Box::new((self.inner)().new_service().map(move |inner| { Box::new(self.inner.create().new_service().map(move |inner| {
let service: BoxedServerService = Box::new(ServerService::new(inner)); let service: BoxedServerService = Box::new(ServerService::new(inner));
service service
})) }))
} }
} }
impl ServerServiceFactory for Box<ServerServiceFactory> { impl InternalServerServiceFactory for Box<InternalServerServiceFactory> {
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<InternalServerServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
@ -120,3 +111,21 @@ impl ServerServiceFactory for Box<ServerServiceFactory> {
self.as_ref().create() self.as_ref().create()
} }
} }
pub trait ServerServiceFactory: Send + Clone + 'static {
type NewService: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>;
fn create(&self) -> Self::NewService;
}
impl<F, T> ServerServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>,
{
type NewService = T;
fn create(&self) -> T {
(self)()
}
}

View File

@ -17,7 +17,7 @@ use actix::{
}; };
use super::accept::AcceptNotify; use super::accept::AcceptNotify;
use super::server_service::{BoxedServerService, ServerMessage, ServerServiceFactory}; use super::server_service::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
use super::Token; use super::Token;
#[derive(Message)] #[derive(Message)]
@ -122,7 +122,7 @@ impl Actor for Worker {
impl Worker { impl Worker {
pub(crate) fn new( pub(crate) fn new(
ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory + Send>>, ctx: &mut Context<Self>, services: Vec<Box<InternalServerServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
) -> Self { ) -> Self {
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {