2018-09-08 18:36:38 +02:00
|
|
|
use std::cell::Cell;
|
|
|
|
use std::rc::Rc;
|
|
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
2018-09-07 22:06:51 +02:00
|
|
|
use std::sync::Arc;
|
2018-08-19 19:47:04 +02:00
|
|
|
use std::{net, time};
|
|
|
|
|
|
|
|
use futures::sync::mpsc::{SendError, UnboundedSender};
|
|
|
|
use futures::sync::oneshot;
|
2018-09-08 18:36:38 +02:00
|
|
|
use futures::task::AtomicTask;
|
2018-09-07 22:06:51 +02:00
|
|
|
use futures::{future, Async, Future, Poll};
|
2018-09-08 18:36:38 +02:00
|
|
|
use tokio_current_thread::spawn;
|
2018-08-19 19:47:04 +02:00
|
|
|
|
|
|
|
use actix::msgs::StopArbiter;
|
|
|
|
use actix::{
|
|
|
|
fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message,
|
|
|
|
Response, WrapFuture,
|
|
|
|
};
|
|
|
|
|
2018-09-07 22:06:51 +02:00
|
|
|
use super::accept::AcceptNotify;
|
2018-09-11 17:43:23 +02:00
|
|
|
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
|
2018-09-07 22:06:51 +02:00
|
|
|
use super::Token;
|
2018-08-19 19:47:04 +02:00
|
|
|
|
|
|
|
#[derive(Message)]
|
|
|
|
pub(crate) struct Conn {
|
|
|
|
pub io: net::TcpStream,
|
|
|
|
pub handler: Token,
|
|
|
|
pub token: Token,
|
|
|
|
pub peer: Option<net::SocketAddr>,
|
|
|
|
}
|
|
|
|
|
2018-09-08 18:36:38 +02:00
|
|
|
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
|
|
|
|
|
|
|
|
/// Sets the maximum per-worker number of concurrent connections.
|
|
|
|
///
|
|
|
|
/// All socket listeners will stop accepting connections when this limit is
|
|
|
|
/// reached for each worker.
|
|
|
|
///
|
|
|
|
/// By default max connections is set to a 25k per worker.
|
|
|
|
pub fn max_concurrent_connections(num: usize) {
|
|
|
|
MAX_CONNS.store(num, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn num_connections() -> usize {
|
|
|
|
MAX_CONNS_COUNTER.with(|conns| conns.total())
|
|
|
|
}
|
|
|
|
|
|
|
|
thread_local! {
|
|
|
|
static MAX_CONNS_COUNTER: Connections =
|
|
|
|
Connections::new(MAX_CONNS.load(Ordering::Relaxed));
|
|
|
|
}
|
|
|
|
|
2018-08-19 19:47:04 +02:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub(crate) struct WorkerClient {
|
|
|
|
pub idx: usize,
|
|
|
|
tx: UnboundedSender<Conn>,
|
2018-09-07 22:06:51 +02:00
|
|
|
avail: WorkerAvailability,
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl WorkerClient {
|
2018-09-07 22:06:51 +02:00
|
|
|
pub fn new(idx: usize, tx: UnboundedSender<Conn>, avail: WorkerAvailability) -> Self {
|
|
|
|
WorkerClient { idx, tx, avail }
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> {
|
|
|
|
self.tx.unbounded_send(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn available(&self) -> bool {
|
2018-09-07 22:06:51 +02:00
|
|
|
self.avail.available()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub(crate) struct WorkerAvailability {
|
|
|
|
notify: AcceptNotify,
|
|
|
|
available: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl WorkerAvailability {
|
|
|
|
pub fn new(notify: AcceptNotify) -> Self {
|
|
|
|
WorkerAvailability {
|
|
|
|
notify,
|
|
|
|
available: Arc::new(AtomicBool::new(false)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn available(&self) -> bool {
|
|
|
|
self.available.load(Ordering::Acquire)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn set(&self, val: bool) {
|
|
|
|
let old = self.available.swap(val, Ordering::Release);
|
|
|
|
if !old && val {
|
|
|
|
self.notify.notify()
|
|
|
|
}
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Stop worker message. Returns `true` on successful shutdown
|
|
|
|
/// and `false` if some connections still alive.
|
|
|
|
pub(crate) struct StopWorker {
|
|
|
|
pub graceful: Option<time::Duration>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Message for StopWorker {
|
|
|
|
type Result = Result<bool, ()>;
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Http worker
|
|
|
|
///
|
|
|
|
/// Worker accepts Socket objects via unbounded channel and start requests
|
|
|
|
/// processing.
|
|
|
|
pub(crate) struct Worker {
|
|
|
|
services: Vec<BoxedServerService>,
|
2018-09-07 22:06:51 +02:00
|
|
|
availability: WorkerAvailability,
|
2018-09-08 18:36:38 +02:00
|
|
|
conns: Connections,
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Actor for Worker {
|
|
|
|
type Context = Context<Self>;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Worker {
|
2018-08-24 00:42:34 +02:00
|
|
|
pub(crate) fn new(
|
2018-09-08 23:50:16 +02:00
|
|
|
ctx: &mut Context<Self>, services: Vec<Box<InternalServerServiceFactory>>,
|
2018-09-07 22:06:51 +02:00
|
|
|
availability: WorkerAvailability,
|
2018-08-19 19:47:04 +02:00
|
|
|
) -> Self {
|
2018-09-08 18:36:38 +02:00
|
|
|
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
|
2018-09-07 22:06:51 +02:00
|
|
|
availability,
|
2018-08-19 19:47:04 +02:00
|
|
|
services: Vec::new(),
|
2018-09-08 18:36:38 +02:00
|
|
|
conns: conns.clone(),
|
|
|
|
});
|
2018-08-19 19:47:04 +02:00
|
|
|
|
|
|
|
ctx.wait(
|
|
|
|
future::join_all(services.into_iter().map(|s| s.create()))
|
|
|
|
.into_actor(&wrk)
|
|
|
|
.map_err(|e, _, ctx| {
|
|
|
|
error!("Can not start worker: {:?}", e);
|
|
|
|
Arbiter::current().do_send(StopArbiter(0));
|
|
|
|
ctx.stop();
|
2018-09-07 22:06:51 +02:00
|
|
|
}).and_then(|services, act, ctx| {
|
2018-08-19 19:47:04 +02:00
|
|
|
act.services.extend(services);
|
2018-09-07 22:06:51 +02:00
|
|
|
act.availability.set(true);
|
|
|
|
ctx.spawn(CheckReadiness(true));
|
2018-08-19 19:47:04 +02:00
|
|
|
fut::ok(())
|
|
|
|
}),
|
|
|
|
);
|
|
|
|
|
|
|
|
wrk
|
|
|
|
}
|
|
|
|
|
2018-09-07 22:06:51 +02:00
|
|
|
fn shutdown(&mut self, force: bool) {
|
|
|
|
if force {
|
|
|
|
self.services.iter_mut().for_each(|h| {
|
|
|
|
h.call(ServerMessage::ForceShutdown);
|
|
|
|
});
|
|
|
|
} else {
|
|
|
|
self.services.iter_mut().for_each(|h| {
|
|
|
|
h.call(ServerMessage::Shutdown);
|
|
|
|
});
|
|
|
|
}
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn shutdown_timeout(
|
2018-09-07 22:06:51 +02:00
|
|
|
&mut self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
|
2018-08-19 19:47:04 +02:00
|
|
|
) {
|
|
|
|
// sleep for 1 second and then check again
|
2018-08-21 07:21:23 +02:00
|
|
|
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
|
2018-09-08 18:36:38 +02:00
|
|
|
let num = num_connections();
|
2018-08-21 07:21:23 +02:00
|
|
|
if num == 0 {
|
|
|
|
let _ = tx.send(true);
|
|
|
|
Arbiter::current().do_send(StopArbiter(0));
|
|
|
|
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
|
|
|
|
slf.shutdown_timeout(ctx, tx, d);
|
|
|
|
} else {
|
|
|
|
info!("Force shutdown http worker, {} connections", num);
|
|
|
|
slf.shutdown(true);
|
|
|
|
let _ = tx.send(false);
|
|
|
|
Arbiter::current().do_send(StopArbiter(0));
|
|
|
|
}
|
|
|
|
});
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Handler<Conn> for Worker {
|
|
|
|
type Result = ();
|
|
|
|
|
|
|
|
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
|
2018-09-08 18:36:38 +02:00
|
|
|
let guard = self.conns.get();
|
|
|
|
spawn(
|
|
|
|
self.services[msg.handler.0]
|
|
|
|
.call(ServerMessage::Connect(msg.io))
|
|
|
|
.map(|val| {
|
|
|
|
drop(guard);
|
|
|
|
val
|
|
|
|
}),
|
|
|
|
)
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// `StopWorker` message handler
|
|
|
|
impl Handler<StopWorker> for Worker {
|
|
|
|
type Result = Response<bool, ()>;
|
|
|
|
|
2018-08-21 07:21:23 +02:00
|
|
|
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
|
2018-09-08 18:36:38 +02:00
|
|
|
let num = num_connections();
|
2018-08-21 07:21:23 +02:00
|
|
|
if num == 0 {
|
|
|
|
info!("Shutting down http worker, 0 connections");
|
|
|
|
Response::reply(Ok(true))
|
|
|
|
} else if let Some(dur) = msg.graceful {
|
|
|
|
self.shutdown(false);
|
|
|
|
let (tx, rx) = oneshot::channel();
|
2018-09-08 18:36:38 +02:00
|
|
|
let num = num_connections();
|
2018-08-21 07:21:23 +02:00
|
|
|
if num != 0 {
|
|
|
|
info!("Graceful http worker shutdown, {} connections", num);
|
|
|
|
self.shutdown_timeout(ctx, tx, dur);
|
|
|
|
Response::reply(Ok(true))
|
|
|
|
} else {
|
|
|
|
Response::async(rx.map_err(|_| ()))
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
info!("Force shutdown http worker, {} connections", num);
|
|
|
|
self.shutdown(true);
|
|
|
|
Response::reply(Ok(false))
|
|
|
|
}
|
2018-08-19 19:47:04 +02:00
|
|
|
}
|
|
|
|
}
|
2018-09-07 22:06:51 +02:00
|
|
|
|
|
|
|
struct CheckReadiness(bool);
|
|
|
|
|
|
|
|
impl ActorFuture for CheckReadiness {
|
|
|
|
type Item = ();
|
|
|
|
type Error = ();
|
|
|
|
type Actor = Worker;
|
|
|
|
|
|
|
|
fn poll(&mut self, act: &mut Worker, _: &mut Context<Worker>) -> Poll<(), ()> {
|
2018-09-08 18:36:38 +02:00
|
|
|
let mut val = act.conns.check();
|
|
|
|
if val {
|
|
|
|
for service in &mut act.services {
|
|
|
|
if let Ok(Async::NotReady) = service.poll_ready() {
|
|
|
|
val = false;
|
|
|
|
break;
|
|
|
|
}
|
2018-09-07 22:06:51 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if self.0 != val {
|
|
|
|
self.0 = val;
|
|
|
|
act.availability.set(val);
|
|
|
|
}
|
|
|
|
Ok(Async::NotReady)
|
|
|
|
}
|
|
|
|
}
|
2018-09-08 18:36:38 +02:00
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub(crate) struct Connections(Rc<ConnectionsInner>);
|
|
|
|
|
|
|
|
struct ConnectionsInner {
|
|
|
|
count: Cell<usize>,
|
|
|
|
maxconn: usize,
|
|
|
|
task: AtomicTask,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Connections {
|
|
|
|
pub fn new(maxconn: usize) -> Self {
|
|
|
|
Connections(Rc::new(ConnectionsInner {
|
|
|
|
maxconn,
|
|
|
|
count: Cell::new(0),
|
|
|
|
task: AtomicTask::new(),
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get(&self) -> ConnectionsGuard {
|
|
|
|
ConnectionsGuard::new(self.0.clone())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn check(&self) -> bool {
|
|
|
|
self.0.check()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn total(&self) -> usize {
|
|
|
|
self.0.count.get()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) struct ConnectionsGuard(Rc<ConnectionsInner>);
|
|
|
|
|
|
|
|
impl ConnectionsGuard {
|
|
|
|
fn new(inner: Rc<ConnectionsInner>) -> Self {
|
|
|
|
inner.inc();
|
|
|
|
ConnectionsGuard(inner)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for ConnectionsGuard {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.0.dec();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ConnectionsInner {
|
|
|
|
fn inc(&self) {
|
|
|
|
let num = self.count.get() + 1;
|
|
|
|
self.count.set(num);
|
|
|
|
if num == self.maxconn {
|
|
|
|
self.task.register();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn dec(&self) {
|
|
|
|
let num = self.count.get();
|
|
|
|
self.count.set(num - 1);
|
|
|
|
if num == self.maxconn {
|
|
|
|
self.task.notify();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn check(&self) -> bool {
|
|
|
|
self.count.get() < self.maxconn
|
|
|
|
}
|
|
|
|
}
|