1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 19:12:56 +01:00

expose ServerMessage service

This commit is contained in:
Nikolay Kim 2018-09-26 20:40:45 -07:00
parent 13c66a2ac4
commit ba57e67a74
5 changed files with 217 additions and 114 deletions

View File

@ -233,29 +233,18 @@ where
fn poll_service(&mut self) -> bool { fn poll_service(&mut self) -> bool {
match self.service.poll_ready() { match self.service.poll_ready() {
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
let mut item = self.request.take(); if let Some(item) = self.request.take() {
let sender = self.write_tx.clone();
actix::Arbiter::spawn(
self.service
.call(item)
.then(|item| sender.send(item).map(|_| ()).map_err(|_| ())),
);
}
loop { loop {
if let Some(item) = item { let item = match self.framed.poll() {
match self.service.poll_ready() { Ok(Async::Ready(Some(el))) => el,
Ok(Async::Ready(_)) => {
let sender = self.write_tx.clone();
actix::Arbiter::spawn(self.service.call(item).then(|item| {
sender.send(item).map(|_| ()).map_err(|_| ())
}));
}
Ok(Async::NotReady) => {
self.request = Some(item);
return false;
}
Err(err) => {
self.state =
TransportState::Error(FramedTransportError::Service(err));
return true;
}
}
}
match self.framed.poll() {
Ok(Async::Ready(Some(el))) => item = Some(el),
Err(err) => { Err(err) => {
self.state = self.state =
TransportState::Error(FramedTransportError::Decoder(err)); TransportState::Error(FramedTransportError::Decoder(err));
@ -266,6 +255,26 @@ where
self.state = TransportState::Stopping; self.state = TransportState::Stopping;
return true; return true;
} }
};
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
let sender = self.write_tx.clone();
actix::Arbiter::spawn(
self.service
.call(item)
.then(|item| sender.send(item).map(|_| ()).map_err(|_| ())),
);
}
Ok(Async::NotReady) => {
self.request = Some(item);
return false;
}
Err(err) => {
self.state =
TransportState::Error(FramedTransportError::Service(err));
return true;
}
} }
} }
} }

View File

@ -8,7 +8,7 @@ mod services;
mod worker; mod worker;
pub use self::server::Server; pub use self::server::Server;
pub use self::services::ServerServiceFactory; pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory};
/// Pause accepting incoming connections /// Pause accepting incoming connections
/// ///

View File

@ -12,7 +12,8 @@ use actix::{
}; };
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory}; use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory};
use super::services::{ServiceFactory, ServiceNewService};
use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::{PauseServer, ResumeServer, StopServer, Token};
@ -24,11 +25,11 @@ pub(crate) enum ServerCommand {
pub struct Server { pub struct Server {
threads: usize, threads: usize,
workers: Vec<(usize, WorkerClient)>, workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServerServiceFactory>>, services: Vec<Box<InternalServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
shutdown_timeout: u16, shutdown_timeout: Duration,
signals: Option<Addr<signal::ProcessSignals>>, signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool, no_signals: bool,
} }
@ -49,7 +50,7 @@ impl Server {
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(), accept: AcceptLoop::new(),
exit: false, exit: false,
shutdown_timeout: 30, shutdown_timeout: Duration::from_secs(30),
signals: None, signals: None,
no_signals: false, no_signals: false,
} }
@ -96,7 +97,7 @@ impl Server {
self self
} }
/// Timeout for graceful workers shutdown. /// Timeout for graceful workers shutdown in seconds.
/// ///
/// After receiving a stop signal, workers have this much time to finish /// After receiving a stop signal, workers have this much time to finish
/// serving requests. Workers still alive after the timeout are force /// serving requests. Workers still alive after the timeout are force
@ -104,7 +105,7 @@ impl Server {
/// ///
/// By default shutdown timeout sets to 30 seconds. /// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self { pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec; self.shutdown_timeout = Duration::from_secs(u64::from(sec));
self self
} }
@ -123,7 +124,7 @@ impl Server {
/// Add new service to server /// Add new service to server
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self> pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where where
F: ServerServiceFactory, F: StreamServiceFactory,
U: net::ToSocketAddrs, U: net::ToSocketAddrs,
{ {
let sockets = bind_addr(addr)?; let sockets = bind_addr(addr)?;
@ -139,11 +140,27 @@ impl Server {
mut self, name: N, lst: net::TcpListener, factory: F, mut self, name: N, lst: net::TcpListener, factory: F,
) -> Self ) -> Self
where where
F: ServerServiceFactory, F: StreamServiceFactory,
{ {
let token = Token(self.services.len()); let token = Token(self.services.len());
self.services self.services
.push(ServerNewService::create(name.as_ref().to_string(), factory)); .push(StreamNewService::create(name.as_ref().to_string(), factory));
self.sockets.push((token, lst));
self
}
/// Add new service to server
pub fn listen2<F, N: AsRef<str>>(
mut self, name: N, lst: net::TcpListener, factory: F,
) -> Self
where
F: ServiceFactory,
{
let token = Token(self.services.len());
self.services.push(ServiceNewService::create(
name.as_ref().to_string(),
factory,
));
self.sockets.push((token, lst)); self.sockets.push((token, lst));
self self
} }
@ -227,13 +244,14 @@ impl Server {
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx, avail.clone()); let worker = WorkerClient::new(idx, tx, avail.clone());
let services: Vec<Box<InternalServerServiceFactory>> = let services: Vec<Box<InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(|| { Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || {
Worker::start(rx, services, avail); Worker::start(rx, services, avail, timeout.clone());
Ok::<_, ()>(()) Ok::<_, ()>(())
})); }));
@ -299,17 +317,12 @@ impl Handler<StopServer> for Server {
// stop workers // stop workers
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers { for worker in &self.workers {
let tx2 = tx.clone(); let tx2 = tx.clone();
ctx.spawn( ctx.spawn(
worker worker
.1 .1
.stop(dur) .stop(msg.graceful)
.into_actor(self) .into_actor(self)
.then(move |_, slf, ctx| { .then(move |_, slf, ctx| {
slf.workers.pop(); slf.workers.pop();

View File

@ -1,27 +1,103 @@
use std::net; use std::net;
use std::time::Duration;
use futures::future::{err, ok}; use futures::future::{err, ok, FutureResult};
use futures::{Future, Poll}; use futures::{Future, Poll};
use tokio_current_thread::spawn;
use tokio_reactor::Handle; use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
use counter::CounterGuard;
use service::{NewService, Service}; use service::{NewService, Service};
/// Server message
pub enum ServerMessage { pub enum ServerMessage {
Connect(net::TcpStream), Connect(net::TcpStream),
Shutdown, Shutdown(Duration),
ForceShutdown, ForceShutdown,
} }
pub trait StreamServiceFactory: Send + Clone + 'static {
type NewService: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>;
fn create(&self) -> Self::NewService;
}
pub trait ServiceFactory: Send + Clone + 'static {
type NewService: NewService<
Request = ServerMessage,
Response = (),
Error = (),
InitError = (),
>;
fn create(&self) -> Self::NewService;
}
pub(crate) trait InternalServiceFactory: Send {
fn name(&self) -> &str;
fn clone_factory(&self) -> Box<InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
}
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
Service< Service<
Request = ServerMessage, Request = (Option<CounterGuard>, ServerMessage),
Response = (), Response = (),
Error = (), Error = (),
Future = Box<Future<Item = (), Error = ()>>, Future = FutureResult<(), ()>,
>, >,
>; >;
pub(crate) struct StreamService<T> {
service: T,
}
impl<T> StreamService<T> {
fn new(service: T) -> Self {
StreamService { service }
}
}
impl<T> Service for StreamService<T>
where
T: Service<Request = TcpStream, Response = (), Error = ()>,
T::Future: 'static,
T::Error: 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ())
}
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req {
ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
spawn(self.service.call(stream).map(move |val| {
drop(guard);
val
}));
ok(())
} else {
err(())
}
}
_ => ok(()),
}
}
}
pub(crate) struct ServerService<T> { pub(crate) struct ServerService<T> {
service: T, service: T,
} }
@ -34,68 +110,51 @@ impl<T> ServerService<T> {
impl<T> Service for ServerService<T> impl<T> Service for ServerService<T>
where where
T: Service<Request = TcpStream, Response = (), Error = ()>, T: Service<Request = ServerMessage, Response = (), Error = ()>,
T::Future: 'static, T::Future: 'static,
T::Error: 'static, T::Error: 'static,
{ {
type Request = ServerMessage; type Request = (Option<CounterGuard>, ServerMessage);
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Box<Future<Item = (), Error = ()>>; type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ()) self.service.poll_ready().map_err(|_| ())
} }
fn call(&mut self, req: ServerMessage) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { spawn(self.service.call(req).map(move |val| {
ServerMessage::Connect(stream) => { drop(guard);
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { val
error!("Can not convert to an async tcp stream: {}", e); }));
}); ok(())
if let Ok(stream) = stream {
Box::new(self.service.call(stream))
} else {
Box::new(err(()))
}
}
_ => Box::new(ok(())),
}
} }
} }
pub(crate) struct ServerNewService<F: ServerServiceFactory> { pub(crate) struct ServiceNewService<F: ServiceFactory> {
name: String, name: String,
inner: F, inner: F,
} }
impl<F> ServerNewService<F> impl<F> ServiceNewService<F>
where where
F: ServerServiceFactory, F: ServiceFactory,
{ {
pub(crate) fn create(name: String, inner: F) -> Box<InternalServerServiceFactory> { pub(crate) fn create(name: String, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner }) Box::new(Self { name, inner })
} }
} }
pub(crate) trait InternalServerServiceFactory: Send { impl<F> InternalServiceFactory for ServiceNewService<F>
fn name(&self) -> &str;
fn clone_factory(&self) -> Box<InternalServerServiceFactory>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
}
impl<F> InternalServerServiceFactory for ServerNewService<F>
where where
F: ServerServiceFactory, F: ServiceFactory,
{ {
fn name(&self) -> &str { fn name(&self) -> &str {
&self.name &self.name
} }
fn clone_factory(&self) -> Box<InternalServerServiceFactory> { fn clone_factory(&self) -> Box<InternalServiceFactory> {
Box::new(Self { Box::new(Self {
name: self.name.clone(), name: self.name.clone(),
inner: self.inner.clone(), inner: self.inner.clone(),
@ -110,12 +169,49 @@ where
} }
} }
impl InternalServerServiceFactory for Box<InternalServerServiceFactory> { pub(crate) struct StreamNewService<F: StreamServiceFactory> {
name: String,
inner: F,
}
impl<F> StreamNewService<F>
where
F: StreamServiceFactory,
{
pub(crate) fn create(name: String, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner })
}
}
impl<F> InternalServiceFactory for StreamNewService<F>
where
F: StreamServiceFactory,
{
fn name(&self) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<InternalServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
})
}
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
Box::new(self.inner.create().new_service().map(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner));
service
}))
}
}
impl InternalServiceFactory for Box<InternalServiceFactory> {
fn name(&self) -> &str { fn name(&self) -> &str {
self.as_ref().name() self.as_ref().name()
} }
fn clone_factory(&self) -> Box<InternalServerServiceFactory> { fn clone_factory(&self) -> Box<InternalServiceFactory> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
@ -124,13 +220,7 @@ impl InternalServerServiceFactory for Box<InternalServerServiceFactory> {
} }
} }
pub trait ServerServiceFactory: Send + Clone + 'static { impl<F, T> StreamServiceFactory for F
type NewService: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>;
fn create(&self) -> Self::NewService;
}
impl<F, T> ServerServiceFactory for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>, T: NewService<Request = TcpStream, Response = (), Error = (), InitError = ()>,

View File

@ -12,7 +12,7 @@ use actix::msgs::StopArbiter;
use actix::{Arbiter, Message}; use actix::{Arbiter, Message};
use super::accept::AcceptNotify; use super::accept::AcceptNotify;
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage}; use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage};
use super::Token; use super::Token;
use counter::Counter; use counter::Counter;
@ -20,7 +20,7 @@ pub(crate) enum WorkerCommand {
Message(Conn), Message(Conn),
/// Stop worker message. Returns `true` on successful shutdown /// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive. /// and `false` if some connections still alive.
Stop(Option<time::Duration>, oneshot::Sender<bool>), Stop(bool, oneshot::Sender<bool>),
} }
#[derive(Debug, Message)] #[derive(Debug, Message)]
@ -79,7 +79,7 @@ impl WorkerClient {
self.avail.available() self.avail.available()
} }
pub fn stop(&self, graceful: Option<time::Duration>) -> oneshot::Receiver<bool> { pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx)); let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx));
rx rx
@ -121,20 +121,22 @@ pub(crate) struct Worker {
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
factories: Vec<Box<InternalServerServiceFactory>>, factories: Vec<Box<InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
shutdown_timeout: time::Duration,
} }
impl Worker { impl Worker {
pub(crate) fn start( pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>, factories: Vec<Box<InternalServiceFactory>>,
factories: Vec<Box<InternalServerServiceFactory>>, availability: WorkerAvailability, availability: WorkerAvailability, shutdown_timeout: time::Duration,
) { ) {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
rx, rx,
availability, availability,
factories, factories,
shutdown_timeout,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()), state: WorkerState::Unavailable(Vec::new()),
@ -159,11 +161,12 @@ impl Worker {
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
if force { if force {
self.services.iter_mut().for_each(|h| { self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::ForceShutdown); let _ = h.call((None, ServerMessage::ForceShutdown));
}); });
} else { } else {
self.services.iter_mut().for_each(|h| { let timeout = self.shutdown_timeout;
h.call(ServerMessage::Shutdown); self.services.iter_mut().for_each(move |h| {
let _ = h.call((None, ServerMessage::Shutdown(timeout.clone())));
}); });
} }
} }
@ -222,14 +225,8 @@ impl Future for Worker {
match self.check_readiness(false) { match self.check_readiness(false) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
spawn( let _ = self.services[msg.handler.0]
self.services[msg.handler.0] .call((Some(guard), ServerMessage::Connect(msg.io)));
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
} }
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
@ -324,14 +321,8 @@ impl Future for Worker {
match self.check_readiness(false) { match self.check_readiness(false) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
spawn( let _ = self.services[msg.handler.0]
self.services[msg.handler.0] .call((Some(guard), ServerMessage::Connect(msg.io)));
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
);
continue; continue;
} }
Ok(false) => { Ok(false) => {
@ -361,14 +352,14 @@ impl Future for Worker {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = tx.send(true); let _ = tx.send(true);
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} else if let Some(dur) = graceful { } else if graceful {
self.shutdown(false); self.shutdown(false);
let num = num_connections(); let num = num_connections();
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
break Some(WorkerState::Shutdown( break Some(WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)), sleep(time::Duration::from_secs(1)),
sleep(dur), sleep(self.shutdown_timeout),
tx, tx,
)); ));
} else { } else {