diff --git a/src/framed.rs b/src/framed.rs index a9551a0a..8b81b0ba 100644 --- a/src/framed.rs +++ b/src/framed.rs @@ -233,29 +233,18 @@ where fn poll_service(&mut self) -> bool { match self.service.poll_ready() { Ok(Async::Ready(_)) => { - let mut item = self.request.take(); + if let Some(item) = self.request.take() { + let sender = self.write_tx.clone(); + actix::Arbiter::spawn( + self.service + .call(item) + .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), + ); + } + loop { - if let Some(item) = item { - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - let sender = self.write_tx.clone(); - actix::Arbiter::spawn(self.service.call(item).then(|item| { - sender.send(item).map(|_| ()).map_err(|_| ()) - })); - } - Ok(Async::NotReady) => { - self.request = Some(item); - return false; - } - Err(err) => { - self.state = - TransportState::Error(FramedTransportError::Service(err)); - return true; - } - } - } - match self.framed.poll() { - Ok(Async::Ready(Some(el))) => item = Some(el), + let item = match self.framed.poll() { + Ok(Async::Ready(Some(el))) => el, Err(err) => { self.state = TransportState::Error(FramedTransportError::Decoder(err)); @@ -266,6 +255,26 @@ where self.state = TransportState::Stopping; return true; } + }; + + match self.service.poll_ready() { + Ok(Async::Ready(_)) => { + let sender = self.write_tx.clone(); + actix::Arbiter::spawn( + self.service + .call(item) + .then(|item| sender.send(item).map(|_| ()).map_err(|_| ())), + ); + } + Ok(Async::NotReady) => { + self.request = Some(item); + return false; + } + Err(err) => { + self.state = + TransportState::Error(FramedTransportError::Service(err)); + return true; + } } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 0f8eb685..0a6f9885 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,7 +8,7 @@ mod services; mod worker; pub use self::server::Server; -pub use self::services::ServerServiceFactory; +pub use self::services::{ServerMessage, ServiceFactory, StreamServiceFactory}; /// Pause accepting incoming connections /// diff --git a/src/server/server.rs b/src/server/server.rs index 6280faa9..484a2abf 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -12,7 +12,8 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory}; +use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; +use super::services::{ServiceFactory, ServiceNewService}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::{PauseServer, ResumeServer, StopServer, Token}; @@ -24,11 +25,11 @@ pub(crate) enum ServerCommand { pub struct Server { threads: usize, workers: Vec<(usize, WorkerClient)>, - services: Vec>, + services: Vec>, sockets: Vec<(Token, net::TcpListener)>, accept: AcceptLoop, exit: bool, - shutdown_timeout: u16, + shutdown_timeout: Duration, signals: Option>, no_signals: bool, } @@ -49,7 +50,7 @@ impl Server { sockets: Vec::new(), accept: AcceptLoop::new(), exit: false, - shutdown_timeout: 30, + shutdown_timeout: Duration::from_secs(30), signals: None, no_signals: false, } @@ -96,7 +97,7 @@ impl Server { self } - /// Timeout for graceful workers shutdown. + /// Timeout for graceful workers shutdown in seconds. /// /// After receiving a stop signal, workers have this much time to finish /// serving requests. Workers still alive after the timeout are force @@ -104,7 +105,7 @@ impl Server { /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout(mut self, sec: u16) -> Self { - self.shutdown_timeout = sec; + self.shutdown_timeout = Duration::from_secs(u64::from(sec)); self } @@ -123,7 +124,7 @@ impl Server { /// Add new service to server pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where - F: ServerServiceFactory, + F: StreamServiceFactory, U: net::ToSocketAddrs, { let sockets = bind_addr(addr)?; @@ -139,11 +140,27 @@ impl Server { mut self, name: N, lst: net::TcpListener, factory: F, ) -> Self where - F: ServerServiceFactory, + F: StreamServiceFactory, { let token = Token(self.services.len()); self.services - .push(ServerNewService::create(name.as_ref().to_string(), factory)); + .push(StreamNewService::create(name.as_ref().to_string(), factory)); + self.sockets.push((token, lst)); + self + } + + /// Add new service to server + pub fn listen2>( + mut self, name: N, lst: net::TcpListener, factory: F, + ) -> Self + where + F: ServiceFactory, + { + let token = Token(self.services.len()); + self.services.push(ServiceNewService::create( + name.as_ref().to_string(), + factory, + )); self.sockets.push((token, lst)); self } @@ -227,13 +244,14 @@ impl Server { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { let (tx, rx) = unbounded(); + let timeout = self.shutdown_timeout; let avail = WorkerAvailability::new(notify); let worker = WorkerClient::new(idx, tx, avail.clone()); - let services: Vec> = + let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(|| { - Worker::start(rx, services, avail); + Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || { + Worker::start(rx, services, avail, timeout.clone()); Ok::<_, ()>(()) })); @@ -299,17 +317,12 @@ impl Handler for Server { // stop workers let (tx, rx) = mpsc::channel(1); - let dur = if msg.graceful { - Some(Duration::new(u64::from(self.shutdown_timeout), 0)) - } else { - None - }; for worker in &self.workers { let tx2 = tx.clone(); ctx.spawn( worker .1 - .stop(dur) + .stop(msg.graceful) .into_actor(self) .then(move |_, slf, ctx| { slf.workers.pop(); diff --git a/src/server/services.rs b/src/server/services.rs index 7a986f7c..a2dad1ba 100644 --- a/src/server/services.rs +++ b/src/server/services.rs @@ -1,27 +1,103 @@ use std::net; +use std::time::Duration; -use futures::future::{err, ok}; +use futures::future::{err, ok, FutureResult}; use futures::{Future, Poll}; +use tokio_current_thread::spawn; use tokio_reactor::Handle; use tokio_tcp::TcpStream; +use counter::CounterGuard; use service::{NewService, Service}; +/// Server message pub enum ServerMessage { Connect(net::TcpStream), - Shutdown, + Shutdown(Duration), ForceShutdown, } +pub trait StreamServiceFactory: Send + Clone + 'static { + type NewService: NewService; + + fn create(&self) -> Self::NewService; +} + +pub trait ServiceFactory: Send + Clone + 'static { + type NewService: NewService< + Request = ServerMessage, + Response = (), + Error = (), + InitError = (), + >; + + fn create(&self) -> Self::NewService; +} + +pub(crate) trait InternalServiceFactory: Send { + fn name(&self) -> &str; + + fn clone_factory(&self) -> Box; + + fn create(&self) -> Box>; +} + pub(crate) type BoxedServerService = Box< Service< - Request = ServerMessage, + Request = (Option, ServerMessage), Response = (), Error = (), - Future = Box>, + Future = FutureResult<(), ()>, >, >; +pub(crate) struct StreamService { + service: T, +} + +impl StreamService { + fn new(service: T) -> Self { + StreamService { service } + } +} + +impl Service for StreamService +where + T: Service, + T::Future: 'static, + T::Error: 'static, +{ + type Request = (Option, ServerMessage); + type Response = (); + type Error = (); + type Future = FutureResult<(), ()>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.service.poll_ready().map_err(|_| ()) + } + + fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { + match req { + ServerMessage::Connect(stream) => { + let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + }); + + if let Ok(stream) = stream { + spawn(self.service.call(stream).map(move |val| { + drop(guard); + val + })); + ok(()) + } else { + err(()) + } + } + _ => ok(()), + } + } +} + pub(crate) struct ServerService { service: T, } @@ -34,68 +110,51 @@ impl ServerService { impl Service for ServerService where - T: Service, + T: Service, T::Future: 'static, T::Error: 'static, { - type Request = ServerMessage; + type Request = (Option, ServerMessage); type Response = (); type Error = (); - type Future = Box>; + type Future = FutureResult<(), ()>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready().map_err(|_| ()) } - fn call(&mut self, req: ServerMessage) -> Self::Future { - match req { - ServerMessage::Connect(stream) => { - let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); - - if let Ok(stream) = stream { - Box::new(self.service.call(stream)) - } else { - Box::new(err(())) - } - } - _ => Box::new(ok(())), - } + fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { + spawn(self.service.call(req).map(move |val| { + drop(guard); + val + })); + ok(()) } } -pub(crate) struct ServerNewService { +pub(crate) struct ServiceNewService { name: String, inner: F, } -impl ServerNewService +impl ServiceNewService where - F: ServerServiceFactory, + F: ServiceFactory, { - pub(crate) fn create(name: String, inner: F) -> Box { + pub(crate) fn create(name: String, inner: F) -> Box { Box::new(Self { name, inner }) } } -pub(crate) trait InternalServerServiceFactory: Send { - fn name(&self) -> &str; - - fn clone_factory(&self) -> Box; - - fn create(&self) -> Box>; -} - -impl InternalServerServiceFactory for ServerNewService +impl InternalServiceFactory for ServiceNewService where - F: ServerServiceFactory, + F: ServiceFactory, { fn name(&self) -> &str { &self.name } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), @@ -110,12 +169,49 @@ where } } -impl InternalServerServiceFactory for Box { +pub(crate) struct StreamNewService { + name: String, + inner: F, +} + +impl StreamNewService +where + F: StreamServiceFactory, +{ + pub(crate) fn create(name: String, inner: F) -> Box { + Box::new(Self { name, inner }) + } +} + +impl InternalServiceFactory for StreamNewService +where + F: StreamServiceFactory, +{ + fn name(&self) -> &str { + &self.name + } + + fn clone_factory(&self) -> Box { + Box::new(Self { + name: self.name.clone(), + inner: self.inner.clone(), + }) + } + + fn create(&self) -> Box> { + Box::new(self.inner.create().new_service().map(move |inner| { + let service: BoxedServerService = Box::new(StreamService::new(inner)); + service + })) + } +} + +impl InternalServiceFactory for Box { fn name(&self) -> &str { self.as_ref().name() } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } @@ -124,13 +220,7 @@ impl InternalServerServiceFactory for Box { } } -pub trait ServerServiceFactory: Send + Clone + 'static { - type NewService: NewService; - - fn create(&self) -> Self::NewService; -} - -impl ServerServiceFactory for F +impl StreamServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, T: NewService, diff --git a/src/server/worker.rs b/src/server/worker.rs index b790de90..a89cf31b 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -12,7 +12,7 @@ use actix::msgs::StopArbiter; use actix::{Arbiter, Message}; use super::accept::AcceptNotify; -use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage}; +use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage}; use super::Token; use counter::Counter; @@ -20,7 +20,7 @@ pub(crate) enum WorkerCommand { Message(Conn), /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. - Stop(Option, oneshot::Sender), + Stop(bool, oneshot::Sender), } #[derive(Debug, Message)] @@ -79,7 +79,7 @@ impl WorkerClient { self.avail.available() } - pub fn stop(&self, graceful: Option) -> oneshot::Receiver { + pub fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx)); rx @@ -121,20 +121,22 @@ pub(crate) struct Worker { services: Vec, availability: WorkerAvailability, conns: Counter, - factories: Vec>, + factories: Vec>, state: WorkerState, + shutdown_timeout: time::Duration, } impl Worker { pub(crate) fn start( - rx: UnboundedReceiver, - factories: Vec>, availability: WorkerAvailability, + rx: UnboundedReceiver, factories: Vec>, + availability: WorkerAvailability, shutdown_timeout: time::Duration, ) { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { rx, availability, factories, + shutdown_timeout, services: Vec::new(), conns: conns.clone(), state: WorkerState::Unavailable(Vec::new()), @@ -159,11 +161,12 @@ impl Worker { fn shutdown(&mut self, force: bool) { if force { self.services.iter_mut().for_each(|h| { - h.call(ServerMessage::ForceShutdown); + let _ = h.call((None, ServerMessage::ForceShutdown)); }); } else { - self.services.iter_mut().for_each(|h| { - h.call(ServerMessage::Shutdown); + let timeout = self.shutdown_timeout; + self.services.iter_mut().for_each(move |h| { + let _ = h.call((None, ServerMessage::Shutdown(timeout.clone()))); }); } } @@ -222,14 +225,8 @@ impl Future for Worker { match self.check_readiness(false) { Ok(true) => { let guard = self.conns.get(); - spawn( - self.services[msg.handler.0] - .call(ServerMessage::Connect(msg.io)) - .map(|val| { - drop(guard); - val - }), - ) + let _ = self.services[msg.handler.0] + .call((Some(guard), ServerMessage::Connect(msg.io))); } Ok(false) => { trace!("Worker is unavailable"); @@ -324,14 +321,8 @@ impl Future for Worker { match self.check_readiness(false) { Ok(true) => { let guard = self.conns.get(); - spawn( - self.services[msg.handler.0] - .call(ServerMessage::Connect(msg.io)) - .map(|val| { - drop(guard); - val - }), - ); + let _ = self.services[msg.handler.0] + .call((Some(guard), ServerMessage::Connect(msg.io))); continue; } Ok(false) => { @@ -361,14 +352,14 @@ impl Future for Worker { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); return Ok(Async::Ready(())); - } else if let Some(dur) = graceful { + } else if graceful { self.shutdown(false); let num = num_connections(); if num != 0 { info!("Graceful worker shutdown, {} connections", num); break Some(WorkerState::Shutdown( sleep(time::Duration::from_secs(1)), - sleep(dur), + sleep(self.shutdown_timeout), tx, )); } else {