1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-03 20:52:13 +01:00
actix-net/src/server/worker.rs

381 lines
14 KiB
Rust
Raw Normal View History

2018-09-08 18:36:38 +02:00
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2018-09-07 22:06:51 +02:00
use std::sync::Arc;
use std::{mem, net, time};
2018-08-19 19:47:04 +02:00
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
2018-08-19 19:47:04 +02:00
use futures::sync::oneshot;
use futures::{future, Async, Future, Poll, Stream};
2018-09-08 18:36:38 +02:00
use tokio_current_thread::spawn;
use tokio_timer::{sleep, Delay};
2018-08-19 19:47:04 +02:00
use actix::msgs::StopArbiter;
use actix::{Arbiter, Message};
2018-08-19 19:47:04 +02:00
2018-09-07 22:06:51 +02:00
use super::accept::AcceptNotify;
2018-09-11 17:43:23 +02:00
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
2018-09-07 22:06:51 +02:00
use super::Token;
use counter::Counter;
2018-08-19 19:47:04 +02:00
pub(crate) enum WorkerCommand {
Message(Conn),
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
Stop(Option<time::Duration>, oneshot::Sender<bool>),
}
2018-09-13 22:32:51 +02:00
#[derive(Debug, Message)]
2018-08-19 19:47:04 +02:00
pub(crate) struct Conn {
pub io: net::TcpStream,
pub handler: Token,
pub token: Token,
pub peer: Option<net::SocketAddr>,
}
2018-09-08 18:36:38 +02:00
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Counter =
Counter::new(MAX_CONNS.load(Ordering::Relaxed));
2018-09-08 18:36:38 +02:00
}
2018-08-19 19:47:04 +02:00
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
tx: UnboundedSender<WorkerCommand>,
2018-09-07 22:06:51 +02:00
avail: WorkerAvailability,
2018-08-19 19:47:04 +02:00
}
impl WorkerClient {
pub fn new(
idx: usize, tx: UnboundedSender<WorkerCommand>, avail: WorkerAvailability,
) -> Self {
2018-09-07 22:06:51 +02:00
WorkerClient { idx, tx, avail }
2018-08-19 19:47:04 +02:00
}
pub fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx
.unbounded_send(WorkerCommand::Message(msg))
.map_err(|e| match e.into_inner() {
WorkerCommand::Message(msg) => msg,
_ => panic!(),
})
2018-08-19 19:47:04 +02:00
}
pub fn available(&self) -> bool {
2018-09-07 22:06:51 +02:00
self.avail.available()
}
pub fn stop(&self, graceful: Option<time::Duration>) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx));
rx
}
2018-09-07 22:06:51 +02:00
}
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
notify: AcceptNotify,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(notify: AcceptNotify) -> Self {
WorkerAvailability {
notify,
available: Arc::new(AtomicBool::new(false)),
}
}
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release);
if !old && val {
self.notify.notify()
}
2018-08-19 19:47:04 +02:00
}
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests
/// processing.
pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>,
2018-08-19 19:47:04 +02:00
services: Vec<BoxedServerService>,
2018-09-07 22:06:51 +02:00
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<InternalServerServiceFactory>>,
state: WorkerState,
2018-08-19 19:47:04 +02:00
}
impl Worker {
pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>,
factories: Vec<Box<InternalServerServiceFactory>>, availability: WorkerAvailability,
) {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
rx,
2018-09-07 22:06:51 +02:00
availability,
factories,
2018-08-19 19:47:04 +02:00
services: Vec::new(),
2018-09-08 18:36:38 +02:00
conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()),
2018-09-08 18:36:38 +02:00
});
2018-08-19 19:47:04 +02:00
let mut fut = Vec::new();
for factory in &wrk.factories {
fut.push(factory.create());
}
spawn(
future::join_all(fut)
.map_err(|e| {
2018-08-19 19:47:04 +02:00
error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0));
}).and_then(move |services| {
wrk.services.extend(services);
wrk
2018-08-19 19:47:04 +02:00
}),
);
}
2018-09-07 22:06:51 +02:00
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::ForceShutdown);
});
} else {
self.services.iter_mut().for_each(|h| {
h.call(ServerMessage::Shutdown);
});
}
2018-08-19 19:47:04 +02:00
}
fn check_readiness(&mut self) -> Result<bool, usize> {
2018-09-14 22:12:55 +02:00
let mut ready = self.conns.available();
let mut failed = None;
for (idx, service) in self.services.iter_mut().enumerate() {
match service.poll_ready() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => ready = false,
Err(_) => {
error!("Service readiness check returned error, restarting");
failed = Some(idx);
}
2018-08-21 07:21:23 +02:00
}
}
if let Some(idx) = failed {
Err(idx)
2018-08-21 07:21:23 +02:00
} else {
Ok(ready)
2018-08-21 07:21:23 +02:00
}
2018-08-19 19:47:04 +02:00
}
}
2018-09-07 22:06:51 +02:00
enum WorkerState {
None,
Available,
Unavailable(Vec<Conn>),
Restarting(usize, Box<Future<Item = BoxedServerService, Error = ()>>),
Shutdown(Delay, Delay, oneshot::Sender<bool>),
}
2018-09-07 22:06:51 +02:00
impl Future for Worker {
2018-09-07 22:06:51 +02:00
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let state = mem::replace(&mut self.state, WorkerState::None);
match state {
WorkerState::Unavailable(mut conns) => {
match self.check_readiness() {
Ok(true) => {
trace!("Serveice is available");
self.state = WorkerState::Available;
// process requests from wait queue
while let Some(msg) = conns.pop() {
match self.check_readiness() {
Ok(true) => {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
}
Ok(false) => {
trace!("Serveice is unavailable");
self.state = WorkerState::Unavailable(conns);
return self.poll();
}
Err(idx) => {
trace!("Serveice failed, restarting");
self.state = WorkerState::Restarting(
idx,
self.factories[idx].create(),
);
return self.poll();
}
}
}
self.availability.set(true);
return self.poll();
}
Ok(false) => {
self.state = WorkerState::Unavailable(conns);
return Ok(Async::NotReady);
}
Err(idx) => {
trace!("Serveice failed, restarting");
self.state = WorkerState::Restarting(idx, self.factories[idx].create());
return self.poll();
}
}
}
WorkerState::Restarting(idx, mut fut) => {
match fut.poll() {
Ok(Async::Ready(service)) => {
trace!("Service has been restarted");
self.services[idx] = service;
self.state = WorkerState::Unavailable(Vec::new());
}
Ok(Async::NotReady) => {
self.state = WorkerState::Restarting(idx, fut);
return Ok(Async::NotReady);
}
Err(_) => {
panic!("Can not restart service");
}
}
return self.poll();
}
WorkerState::Shutdown(mut t1, mut t2, tx) => {
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
return Ok(Async::Ready(()));
}
// check graceful timeout
match t2.poll().unwrap() {
Async::NotReady => (),
Async::Ready(_) => {
self.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
return Ok(Async::Ready(()));
}
2018-09-08 18:36:38 +02:00
}
// sleep for 1 second and then check again
match t1.poll().unwrap() {
Async::NotReady => (),
Async::Ready(_) => {
t1 = sleep(time::Duration::from_secs(1));
let _ = t1.poll();
}
}
self.state = WorkerState::Shutdown(t1, t2, tx);
return Ok(Async::NotReady);
2018-09-07 22:06:51 +02:00
}
WorkerState::Available => {
loop {
match self.rx.poll() {
// handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => {
match self.check_readiness() {
Ok(true) => {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
);
continue;
}
Ok(false) => {
trace!("Serveice is unsavailable");
self.availability.set(false);
self.state = WorkerState::Unavailable(vec![msg]);
}
Err(idx) => {
trace!("Serveice failed, restarting");
self.availability.set(false);
self.state = WorkerState::Restarting(
idx,
self.factories[idx].create(),
);
}
}
return self.poll();
}
// `StopWorker` message handler
Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {
self.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
let _ = tx.send(true);
return Ok(Async::Ready(()));
} else if let Some(dur) = graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
break Some(WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)),
sleep(dur),
tx,
));
} else {
let _ = tx.send(true);
return Ok(Async::Ready(()));
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
let _ = tx.send(false);
return Ok(Async::Ready(()));
}
}
Ok(Async::NotReady) => {
self.state = WorkerState::Available;
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
}
}
}
WorkerState::None => panic!(),
};
2018-09-07 22:06:51 +02:00
Ok(Async::NotReady)
}
}