1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-03 20:52:13 +01:00
actix-net/src/server/worker.rs

362 lines
9.7 KiB
Rust
Raw Normal View History

2018-09-08 18:36:38 +02:00
use std::cell::Cell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2018-09-07 22:06:51 +02:00
use std::sync::Arc;
2018-08-19 19:47:04 +02:00
use std::{net, time};
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
2018-09-08 18:36:38 +02:00
use futures::task::AtomicTask;
2018-09-07 22:06:51 +02:00
use futures::{future, Async, Future, Poll};
2018-09-08 18:36:38 +02:00
use tokio_current_thread::spawn;
2018-08-19 19:47:04 +02:00
use actix::msgs::StopArbiter;
use actix::{
fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message,
Response, WrapFuture,
};
2018-09-07 22:06:51 +02:00
use super::accept::AcceptNotify;
2018-09-11 17:43:23 +02:00
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
2018-09-07 22:06:51 +02:00
use super::Token;
2018-08-19 19:47:04 +02:00
2018-09-13 22:32:51 +02:00
#[derive(Debug, Message)]
2018-08-19 19:47:04 +02:00
pub(crate) struct Conn {
pub io: net::TcpStream,
pub handler: Token,
pub token: Token,
pub peer: Option<net::SocketAddr>,
}
2018-09-08 18:36:38 +02:00
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Connections =
Connections::new(MAX_CONNS.load(Ordering::Relaxed));
}
2018-08-19 19:47:04 +02:00
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
tx: UnboundedSender<Conn>,
2018-09-07 22:06:51 +02:00
avail: WorkerAvailability,
2018-08-19 19:47:04 +02:00
}
impl WorkerClient {
2018-09-07 22:06:51 +02:00
pub fn new(idx: usize, tx: UnboundedSender<Conn>, avail: WorkerAvailability) -> Self {
WorkerClient { idx, tx, avail }
2018-08-19 19:47:04 +02:00
}
pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> {
self.tx.unbounded_send(msg)
}
pub fn available(&self) -> bool {
2018-09-07 22:06:51 +02:00
self.avail.available()
}
}
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
notify: AcceptNotify,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(notify: AcceptNotify) -> Self {
WorkerAvailability {
notify,
available: Arc::new(AtomicBool::new(false)),
}
}
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
self.notify.notify()
}
2018-08-19 19:47:04 +02:00
}
}
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests
/// processing.
pub(crate) struct Worker {
services: Vec<BoxedServerService>,
2018-09-07 22:06:51 +02:00
availability: WorkerAvailability,
2018-09-08 18:36:38 +02:00
conns: Connections,
factories: Vec<Box<InternalServerServiceFactory>>,
2018-08-19 19:47:04 +02:00
}
impl Actor for Worker {
type Context = Context<Self>;
}
impl Worker {
2018-08-24 00:42:34 +02:00
pub(crate) fn new(
ctx: &mut Context<Self>, factories: Vec<Box<InternalServerServiceFactory>>,
2018-09-07 22:06:51 +02:00
availability: WorkerAvailability,
2018-08-19 19:47:04 +02:00
) -> Self {
availability.set(false);
2018-09-08 18:36:38 +02:00
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
2018-09-07 22:06:51 +02:00
availability,
factories,
2018-08-19 19:47:04 +02:00
services: Vec::new(),
2018-09-08 18:36:38 +02:00
conns: conns.clone(),
});
2018-08-19 19:47:04 +02:00
let mut fut = Vec::new();
for factory in &wrk.factories {
fut.push(factory.create());
}
2018-08-19 19:47:04 +02:00
ctx.wait(
future::join_all(fut)
2018-08-19 19:47:04 +02:00
.into_actor(&wrk)
.map_err(|e, _, ctx| {
error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0));
ctx.stop();
2018-09-07 22:06:51 +02:00
}).and_then(|services, act, ctx| {
2018-08-19 19:47:04 +02:00
act.services.extend(services);
let mut readiness = CheckReadiness {
2018-09-13 22:32:51 +02:00
avail: false,
idx: 0,
fut: None,
};
let _ = readiness.poll(act, ctx);
ctx.spawn(readiness);
2018-08-19 19:47:04 +02:00
fut::ok(())
}),
);
wrk
}
2018-09-07 22:06:51 +02:00
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::ForceShutdown);
});
} else {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::Shutdown);
});
}
2018-08-19 19:47:04 +02:00
}
fn shutdown_timeout(
2018-09-07 22:06:51 +02:00
&mut self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
2018-08-19 19:47:04 +02:00
) {
// sleep for 1 second and then check again
2018-08-21 07:21:23 +02:00
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
2018-09-08 18:36:38 +02:00
let num = num_connections();
2018-08-21 07:21:23 +02:00
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
});
2018-08-19 19:47:04 +02:00
}
}
impl Handler<Conn> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
2018-09-08 18:36:38 +02:00
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
2018-08-19 19:47:04 +02:00
}
}
/// `StopWorker` message handler
impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
2018-08-21 07:21:23 +02:00
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
2018-09-08 18:36:38 +02:00
let num = num_connections();
2018-08-21 07:21:23 +02:00
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
self.shutdown(false);
let (tx, rx) = oneshot::channel();
2018-09-08 18:36:38 +02:00
let num = num_connections();
2018-08-21 07:21:23 +02:00
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
Response::reply(Ok(true))
} else {
Response::async(rx.map_err(|_| ()))
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
Response::reply(Ok(false))
}
2018-08-19 19:47:04 +02:00
}
}
2018-09-07 22:06:51 +02:00
struct CheckReadiness {
avail: bool,
idx: usize,
fut: Option<Box<Future<Item = BoxedServerService, Error = ()>>>,
}
2018-09-07 22:06:51 +02:00
impl ActorFuture for CheckReadiness {
type Item = ();
type Error = ();
type Actor = Worker;
fn poll(&mut self, act: &mut Worker, ctx: &mut Context<Worker>) -> Poll<(), ()> {
if self.fut.is_some() {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::Ready(service)) => {
trace!("Service has been restarted");
act.services[self.idx] = service;
self.fut.take();
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
panic!("Can not restart service");
}
}
}
let mut ready = act.conns.check();
if ready {
// check if service is restarting
let mut failed = None;
for (idx, service) in act.services.iter_mut().enumerate() {
match service.poll_ready() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => ready = false,
Err(_) => {
error!("Service readiness check returned error, restarting");
failed = Some(idx);
}
2018-09-08 18:36:38 +02:00
}
2018-09-07 22:06:51 +02:00
}
if let Some(idx) = failed {
self.idx = idx;
self.fut = Some(act.factories[idx].create());
return self.poll(act, ctx);
}
2018-09-07 22:06:51 +02:00
}
if self.avail != ready {
self.avail = ready;
act.availability.set(ready);
2018-09-07 22:06:51 +02:00
}
Ok(Async::NotReady)
}
}
2018-09-08 18:36:38 +02:00
#[derive(Clone)]
pub(crate) struct Connections(Rc<ConnectionsInner>);
struct ConnectionsInner {
count: Cell<usize>,
maxconn: usize,
task: AtomicTask,
}
impl Connections {
pub fn new(maxconn: usize) -> Self {
Connections(Rc::new(ConnectionsInner {
maxconn,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> ConnectionsGuard {
ConnectionsGuard::new(self.0.clone())
}
pub fn check(&self) -> bool {
self.0.check()
}
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub(crate) struct ConnectionsGuard(Rc<ConnectionsInner>);
impl ConnectionsGuard {
fn new(inner: Rc<ConnectionsInner>) -> Self {
inner.inc();
ConnectionsGuard(inner)
}
}
impl Drop for ConnectionsGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl ConnectionsInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}