diff --git a/examples/basic.rs b/examples/basic.rs index fd21320c..43e2bf4c 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,4 +1,5 @@ //! simple composite service +//! build: cargo run --example basic --features "ssl" //! to test: curl https://127.0.0.1:8443/ -k extern crate actix; extern crate actix_net; diff --git a/src/server.rs b/src/server.rs index 2761a3b5..1224de11 100644 --- a/src/server.rs +++ b/src/server.rs @@ -432,10 +432,6 @@ impl Connections { self.0.available() } - pub(crate) fn num_connections(&self) -> usize { - self.0.conn.load(Ordering::Relaxed) - } - /// Report opened connection pub fn connection(&self) -> ConnectionTag { ConnectionTag::new(self.0.clone()) diff --git a/src/server_service.rs b/src/server_service.rs index c18cc62f..973a9c3d 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -1,3 +1,7 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; use std::{fmt, io, net}; use futures::{future, Future, Poll}; @@ -16,6 +20,7 @@ pub(crate) type BoxedServerService = Box< pub(crate) struct ServerService { inner: T, + counter: Arc, } impl Service for ServerService @@ -39,7 +44,11 @@ where }); if let Ok(stream) = stream { - Box::new(self.inner.call(stream).map_err(|_| ())) + let counter = self.counter.clone(); + let _ = counter.fetch_add(1, Ordering::Relaxed); + Box::new(self.inner.call(stream).map_err(|_| ()).map(move |_| { + let _ = counter.fetch_sub(1, Ordering::Relaxed); + })) } else { Box::new(future::err(())) } @@ -48,6 +57,7 @@ where pub(crate) struct ServerNewService { inner: T, + counter: Arc, } impl ServerNewService @@ -61,11 +71,16 @@ where T::Error: fmt::Display, { pub(crate) fn create(inner: T) -> Box { - Box::new(Self { inner }) + Box::new(Self { + inner, + counter: Arc::new(AtomicUsize::new(0)), + }) } } pub trait ServerServiceFactory { + fn counter(&self) -> Arc; + fn clone_factory(&self) -> Box; fn create(&self) -> Box>; @@ -81,21 +96,31 @@ where T::Future: 'static, T::Error: fmt::Display, { + fn counter(&self) -> Arc { + self.counter.clone() + } + fn clone_factory(&self) -> Box { Box::new(Self { inner: self.inner.clone(), + counter: Arc::new(AtomicUsize::new(0)), }) } fn create(&self) -> Box> { - Box::new(self.inner.new_service().map_err(|_| ()).map(|inner| { - let service: BoxedServerService = Box::new(ServerService { inner }); + let counter = self.counter.clone(); + Box::new(self.inner.new_service().map_err(|_| ()).map(move |inner| { + let service: BoxedServerService = Box::new(ServerService { inner, counter }); service })) } } impl ServerServiceFactory for Box { + fn counter(&self) -> Arc { + self.as_ref().counter() + } + fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } diff --git a/src/service.rs b/src/service.rs index 5af6b3d4..b854765a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -583,7 +583,7 @@ where type Future = MapErrFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.a.poll_ready().map_err(|e| (self.f)(e)) + self.a.poll_ready().map_err(&self.f) } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -619,7 +619,7 @@ where type Error = E; fn poll(&mut self) -> Poll { - self.fut.poll().map_err(|e| (self.f)(e)) + self.fut.poll().map_err(&self.f) } } @@ -795,6 +795,6 @@ where type Error = E; fn poll(&mut self) -> Poll { - self.fut.poll().map_err(|e| (self.f)(e)) + self.fut.poll().map_err(&self.f) } } diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index bf9c47fd..512dcfb4 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -6,7 +6,6 @@ use futures::{future, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; -use tokio_tcp::TcpStream; use tower_service::{NewService, Service}; use {IntoNewService, IoStream}; diff --git a/src/worker.rs b/src/worker.rs index 32b97887..80bf1356 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,8 +1,10 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::{net, time}; -use futures::future; use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::oneshot; +use futures::{future, Future}; use actix::msgs::StopArbiter; use actix::{ @@ -59,6 +61,7 @@ impl Message for StopWorker { pub(crate) struct Worker { // conns: Connections, services: Vec, + counters: Vec>, } impl Actor for Worker { @@ -71,6 +74,7 @@ impl Worker { ) -> Self { let wrk = Worker { services: Vec::new(), + counters: services.iter().map(|i| i.counter()).collect(), }; ctx.wait( @@ -94,23 +98,26 @@ impl Worker { } fn shutdown_timeout( - &self, _ctx: &mut Context, _tx: oneshot::Sender, _dur: time::Duration, + &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, ) { // sleep for 1 second and then check again - // ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - // let num = slf.conns.num_connections(); - // if num == 0 { - // let _ = tx.send(true); - // Arbiter::current().do_send(StopArbiter(0)); - // } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { - // slf.shutdown_timeout(ctx, tx, d); - // } else { - // info!("Force shutdown http worker, {} connections", num); - // slf.shutdown(true); - // let _ = tx.send(false); - // Arbiter::current().do_send(StopArbiter(0)); - // } - // }); + ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { + let num = slf + .counters + .iter() + .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + if num == 0 { + let _ = tx.send(true); + Arbiter::current().do_send(StopArbiter(0)); + } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { + slf.shutdown_timeout(ctx, tx, d); + } else { + info!("Force shutdown http worker, {} connections", num); + slf.shutdown(true); + let _ = tx.send(false); + Arbiter::current().do_send(StopArbiter(0)); + } + }); } } @@ -126,27 +133,32 @@ impl Handler for Worker { impl Handler for Worker { type Result = Response; - fn handle(&mut self, _msg: StopWorker, _ctx: &mut Context) -> Self::Result { - unimplemented!() - // let num = self.conns.num_connections(); - // if num == 0 { - // info!("Shutting down http worker, 0 connections"); - // Response::reply(Ok(true)) - // } else if let Some(dur) = msg.graceful { - // self.shutdown(false); - // let (tx, rx) = oneshot::channel(); - // let num = self.conns.num_connections(); - // if num != 0 { - // info!("Graceful http worker shutdown, {} connections", num); - // self.shutdown_timeout(ctx, tx, dur); - // Response::reply(Ok(true)) - // } else { - // Response::async(rx.map_err(|_| ())) - // } - // } else { - // info!("Force shutdown http worker, {} connections", num); - // self.shutdown(true); - // Response::reply(Ok(false)) - // } + fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { + let num = self + .counters + .iter() + .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + if num == 0 { + info!("Shutting down http worker, 0 connections"); + Response::reply(Ok(true)) + } else if let Some(dur) = msg.graceful { + self.shutdown(false); + let (tx, rx) = oneshot::channel(); + let num = self + .counters + .iter() + .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + if num != 0 { + info!("Graceful http worker shutdown, {} connections", num); + self.shutdown_timeout(ctx, tx, dur); + Response::reply(Ok(true)) + } else { + Response::async(rx.map_err(|_| ())) + } + } else { + info!("Force shutdown http worker, {} connections", num); + self.shutdown(true); + Response::reply(Ok(false)) + } } }