1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

add graceful shutdown system

This commit is contained in:
Nikolay Kim 2017-12-28 16:25:47 -08:00
parent 3f4898a6d1
commit 538fea8027
8 changed files with 208 additions and 33 deletions

View File

@ -11,7 +11,4 @@ path = "src/main.rs"
env_logger = "*"
futures = "0.1"
actix = "^0.3.5"
#actix-web = { git = "https://github.com/actix/actix-web.git" }
actix-web = { path="../../", features=["signal"] }
actix-web = { git = "https://github.com/actix/actix-web.git", features=["signal"] }

View File

@ -3,10 +3,22 @@ extern crate actix_web;
extern crate futures;
extern crate env_logger;
use actix::*;
use actix_web::*;
use actix::Arbiter;
use actix::actors::signal::{ProcessSignals, Subscribe};
struct MyWebSocket;
impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, _: ws::Message, _: &mut Self::Context) -> Response<Self, ws::Message> {
Self::empty()
}
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
@ -17,6 +29,7 @@ fn main() {
Application::new()
// enable logger
.middleware(middleware::Logger::default())
.resource("/ws/", |r| r.f(|req| ws::start(req, MyWebSocket)))
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();

View File

@ -164,3 +164,73 @@ fn index(req: HttpRequest) -> HttpResponse {
}
# fn main() {}
```
## Graceful shutdown
Actix http server support graceful shutdown. After receiving a stop signal, workers
have specific amount of time to finish serving requests. Workers still alive after the
timeout are force dropped. By default shutdown timeout sets to 30 seconds.
You can change this parameter with `HttpServer::shutdown_timeout()` method.
You can send stop message to server with server address and specify if you what
graceful shutdown or not. `start()` or `spawn()` methods return address of the server.
```rust
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
# use futures::Future;
use actix_web::*;
fn main() {
let addr = HttpServer::new(
|| Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk)))
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds
.spawn();
let _ = addr.call_fut(
dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
}
```
It is possible to use unix signals on compatible OSs. "signal" feature needs to be enabled
in *Cargo.toml* for *actix-web* dependency.
```toml
[dependencies]
actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] }
```
Then you can subscribe your server to unix signals. Http server handles three signals:
* *SIGINT* - Force shutdown workers
* *SIGTERM* - Graceful shutdown workers
* *SIGQUIT* - Force shutdown workers
```rust,ignore
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
fn main() {
let sys = actix::System::new("signals");
let addr = HttpServer::new(|| {
Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();
// Subscribe to unix signals
let signals = Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0));
let _ = sys.run();
}
```

View File

@ -1,7 +1,6 @@
use std::rc::Rc;
use std::net::SocketAddr;
use actix::dev::*;
use bytes::Bytes;
use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite};
@ -71,6 +70,7 @@ impl<T, H> HttpChannel<T, H>
pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{
h.add_channel();
if http2 {
HttpChannel {
proto: Some(HttpProtocol::H2(
@ -89,12 +89,6 @@ impl<T, H> HttpChannel<T, H>
}
}*/
impl<T, H> Actor for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
type Context = Context<Self>;
}
impl<T, H> Future for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
@ -105,16 +99,27 @@ impl<T, H> Future for HttpChannel<T, H>
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
match h1.poll() {
Ok(Async::Ready(h1::Http1Result::Done)) =>
return Ok(Async::Ready(())),
Ok(Async::Ready(h1::Http1Result::Done)) => {
h1.settings().remove_channel();
return Ok(Async::Ready(()))
}
Ok(Async::Ready(h1::Http1Result::Switch)) => (),
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Err(_) =>
return Err(()),
Err(_) => {
h1.settings().remove_channel();
return Err(())
}
}
Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(),
}
Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll();
match result {
Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(),
_ => (),
}
return result
}
None => unreachable!(),
}

View File

@ -89,6 +89,10 @@ impl<T, H> Http1<T, H>
keepalive_timer: None }
}
pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref()
}
pub fn into_inner(self) -> (Rc<WorkerSettings<H>>, T, Option<SocketAddr>, Bytes) {
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
}

View File

@ -64,6 +64,10 @@ impl<T, H> Http2<T, H>
}
}
pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref()
}
pub fn poll(&mut self) -> Poll<(), ()> {
// server
if let State::Server(ref mut server) = self.state {

View File

@ -7,7 +7,7 @@ use std::collections::HashMap;
use actix::dev::*;
use actix::System;
use futures::Stream;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
@ -107,6 +107,7 @@ pub struct HttpServer<T, A, H, U>
sockets: HashMap<net::SocketAddr, net::TcpListener>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool,
shutdown_timeout: u16,
}
unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: 'static {}
@ -151,6 +152,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
sockets: HashMap::new(),
accept: Vec::new(),
exit: false,
shutdown_timeout: 30,
}
}
@ -210,6 +212,17 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self
}
/// Timeout for graceful workers shutdown.
///
/// After receiving a stop signal, workers have this much time to finish serving requests.
/// Workers still alive after the timeout are force dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u16) -> Self {
self.shutdown_timeout = sec;
self
}
/// Get addresses of bound sockets.
pub fn addrs(&self) -> Vec<net::SocketAddr> {
self.sockets.keys().cloned().collect()
@ -607,14 +620,36 @@ impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
let _ = item.1.send(Command::Stop);
let _ = item.0.set_readiness(mio::Ready::readable());
}
ctx.stop();
// stop workers
let dur = if msg.graceful { Some(Duration::new(30, 0)) } else { None };
let (tx, rx) = mpsc::channel(1);
let dur = if msg.graceful {
Some(Duration::new(u64::from(self.shutdown_timeout), 0))
} else {
None
};
for worker in &self.workers {
worker.send(StopWorker{graceful: dur})
let tx2 = tx.clone();
let fut = worker.call(self, StopWorker{graceful: dur});
ActorFuture::then(fut, move |_, slf, _| {
slf.workers.pop();
if slf.workers.is_empty() {
let _ = tx2.send(());
// we need to stop system if server was spawned
if slf.exit {
Arbiter::system().send(msgs::SystemExit(0))
}
}
fut::ok(())
}).spawn(ctx);
}
if !self.workers.is_empty() {
Self::async_reply(
rx.into_future().map(|_| ()).map_err(|_| ()).actfuture())
} else {
// we need to stop system if server was spawned
if self.exit {
Arbiter::system().send(msgs::SystemExit(0))
@ -622,6 +657,7 @@ impl<T, A, H, U> Handler<StopServer> for HttpServer<T, A, H, U>
Self::empty()
}
}
}
enum Command {
Pause,

View File

@ -1,25 +1,27 @@
use std::{net, time};
use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::cell::{Cell, RefCell, RefMut};
use futures::Future;
use futures::unsync::oneshot;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use net2::TcpStreamExt;
#[cfg(feature="tls")]
use futures::{future, Future};
use futures::future;
#[cfg(feature="tls")]
use native_tls::TlsAcceptor;
#[cfg(feature="tls")]
use tokio_tls::TlsAcceptorExt;
#[cfg(feature="alpn")]
use futures::{future, Future};
use futures::future;
#[cfg(feature="alpn")]
use openssl::ssl::SslAcceptor;
#[cfg(feature="alpn")]
use tokio_openssl::SslAcceptorExt;
use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler};
use actix::*;
use actix::msgs::StopArbiter;
use helpers;
@ -33,8 +35,10 @@ pub(crate) struct Conn<T> {
pub http2: bool,
}
/// Stop worker
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
#[derive(Message)]
#[rtype(bool)]
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
@ -45,6 +49,7 @@ pub(crate) struct WorkerSettings<H> {
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>,
}
impl<H> WorkerSettings<H> {
@ -55,6 +60,7 @@ impl<H> WorkerSettings<H> {
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0),
}
}
@ -73,6 +79,17 @@ impl<H> WorkerSettings<H> {
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
pub fn add_channel(&self) {
self.channels.set(self.channels.get()+1);
}
pub fn remove_channel(&self) {
let num = self.channels.get();
if num > 0 {
self.channels.set(num-1);
} else {
error!("Number of removed channels is bigger than added channel. Bug in actix-web");
}
}
}
/// Http worker
@ -100,6 +117,24 @@ impl<H: 'static> Worker<H> {
helpers::update_date();
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
fn shutdown_timeout(&self, ctx: &mut Context<Self>,
tx: oneshot::Sender<bool>, dur: time::Duration) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf.h.channels.get();
if num == 0 {
let _ = tx.send(true);
Arbiter::arbiter().send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
let _ = tx.send(false);
Arbiter::arbiter().send(StopArbiter(0));
}
});
}
}
impl<H: 'static> Actor for Worker<H> {
@ -133,10 +168,21 @@ impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
impl<H> Handler<StopWorker> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, _: StopWorker, _: &mut Context<Self>) -> Response<Self, StopWorker>
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Response<Self, StopWorker>
{
Arbiter::arbiter().send(StopArbiter(0));
Self::empty()
let num = self.h.channels.get();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Self::reply(true)
} else if let Some(dur) = msg.graceful {
info!("Graceful http worker shutdown, {} connections", num);
let (tx, rx) = oneshot::channel();
self.shutdown_timeout(ctx, tx, dur);
Self::async_reply(rx.map_err(|_| ()).actfuture())
} else {
info!("Force shutdown http worker, {} connections", num);
Self::reply(false)
}
}
}