1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

subscriber to os signals automatically

This commit is contained in:
Nikolay Kim 2018-01-05 16:32:36 -08:00
parent 473ec38439
commit 3ed9e872ad
17 changed files with 81 additions and 214 deletions

View File

@ -106,7 +106,6 @@ members = [
"examples/json", "examples/json",
"examples/hello-world", "examples/hello-world",
"examples/multipart", "examples/multipart",
"examples/signals",
"examples/state", "examples/state",
"examples/template_tera", "examples/template_tera",
"examples/tls", "examples/tls",

View File

@ -3,7 +3,6 @@
Actix web is a small, fast, down-to-earth, open source rust web framework. Actix web is a small, fast, down-to-earth, open source rust web framework.
```rust,ignore ```rust,ignore
extern crate actix;
extern crate actix_web; extern crate actix_web;
use actix_web::*; use actix_web::*;
@ -12,14 +11,11 @@ fn index(req: HttpRequest) -> String {
} }
fn main() { fn main() {
let sys = actix::System::new("readme");
HttpServer::new( HttpServer::new(
|| Application::new() || Application::new()
.resource("/{name}", |r| r.f(index))) .resource("/{name}", |r| r.f(index)))
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .run();
sys.run();
} }
``` ```

View File

@ -10,7 +10,7 @@ use futures::Stream;
use actix_web::*; use actix_web::*;
use actix_web::middleware::RequestSession; use actix_web::middleware::RequestSession;
use futures::future::{FutureResult, result}; use futures::future::{FutureResult, result};
use actix::actors::signal::{ProcessSignals, Subscribe};
/// simple handler /// simple handler
fn index(mut req: HttpRequest) -> Result<HttpResponse> { fn index(mut req: HttpRequest) -> Result<HttpResponse> {
@ -94,10 +94,6 @@ fn main() {
.bind("0.0.0.0:8080").unwrap() .bind("0.0.0.0:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -18,7 +18,6 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
use diesel::prelude::*; use diesel::prelude::*;
use futures::future::Future; use futures::future::Future;
@ -69,10 +68,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(_addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -8,7 +8,6 @@ extern crate serde_json;
#[macro_use] extern crate json; #[macro_use] extern crate json;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
use bytes::BytesMut; use bytes::BytesMut;
use futures::{Future, Stream}; use futures::{Future, Stream};
@ -95,10 +94,6 @@ fn main() {
.shutdown_timeout(1) .shutdown_timeout(1)
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -6,7 +6,6 @@ extern crate futures;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::future::{result, Either}; use futures::future::{result, Either};
@ -55,10 +54,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Starting http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -1,15 +0,0 @@
[package]
name = "signals"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
workspace = "../.."
[[bin]]
name = "server"
path = "src/main.rs"
[dependencies]
env_logger = "*"
futures = "0.1"
actix = "0.4"
actix-web = { path = "../../" }

View File

@ -1,17 +0,0 @@
# Signals
This example shows how to handle Unix signals and properly stop http server. This example does not work with Windows.
## Usage
```bash
cd actix-web/examples/signal
cargo run (or ``cargo watch -x run``)
# Started http server: 127.0.0.1:8080
# CTRL+C
# INFO:actix_web::server: SIGINT received, exiting
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
# INFO:actix_web::worker: Shutting down http worker, 0 connections
```

View File

@ -1,45 +0,0 @@
extern crate actix;
extern crate actix_web;
extern crate futures;
extern crate env_logger;
use actix::*;
use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
struct MyWebSocket;
impl Actor for MyWebSocket {
type Context = HttpContext<Self>;
}
impl StreamHandler<ws::Message> for MyWebSocket {}
impl Handler<ws::Message> for MyWebSocket {
type Result = ();
fn handle(&mut self, _: ws::Message, _: &mut Self::Context) {
{}
}
}
fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init();
let sys = actix::System::new("signals-example");
let addr = HttpServer::new(|| {
Application::new()
// enable logger
.middleware(middleware::Logger::default())
.resource("/ws/", |r| r.f(|req| ws::start(req, MyWebSocket)))
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080");
let _ = sys.run();
}

View File

@ -11,7 +11,6 @@ use std::cell::Cell;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
/// Application state /// Application state
struct AppState { struct AppState {
@ -74,10 +73,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -5,7 +5,6 @@ extern crate env_logger;
extern crate tera; extern crate tera;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
struct State { struct State {
@ -43,10 +42,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -7,7 +7,7 @@ use std::fs::File;
use std::io::Read; use std::io::Read;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
/// somple handle /// somple handle
fn index(req: HttpRequest) -> Result<HttpResponse> { fn index(req: HttpRequest) -> Result<HttpResponse> {
@ -46,10 +46,6 @@ fn main() {
.bind("127.0.0.1:8443").unwrap() .bind("127.0.0.1:8443").unwrap()
.start_ssl(&pkcs12).unwrap(); .start_ssl(&pkcs12).unwrap();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8443"); println!("Started http server: 127.0.0.1:8443");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -18,7 +18,6 @@ use std::time::Instant;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
mod codec; mod codec;
mod server; mod server;
@ -213,10 +212,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -10,7 +10,6 @@ extern crate env_logger;
use actix::*; use actix::*;
use actix_web::*; use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
/// do websocket handshake and start `MyWebSocket` actor /// do websocket handshake and start `MyWebSocket` actor
fn ws_index(r: HttpRequest) -> Result<HttpResponse> { fn ws_index(r: HttpRequest) -> Result<HttpResponse> {
@ -71,10 +70,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(_addr.subscriber()));
println!("Started http server: 127.0.0.1:8080"); println!("Started http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -61,7 +61,7 @@ connections. Server accepts function that should return `HttpHandler` instance:
|| Application::new() || Application::new()
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))
.bind("127.0.0.1:8088")? .bind("127.0.0.1:8088")?
.start(); .run();
``` ```
That's it. Now, compile and run the program with cargo run. That's it. Now, compile and run the program with cargo run.
@ -69,9 +69,8 @@ Head over to ``http://localhost:8088/`` to see the results.
Here is full source of main.rs file: Here is full source of main.rs file:
```rust ```rust,ignore
extern crate actix; # extern crate actix_web;
extern crate actix_web;
use actix_web::*; use actix_web::*;
fn index(req: HttpRequest) -> &'static str { fn index(req: HttpRequest) -> &'static str {
@ -79,17 +78,11 @@ fn index(req: HttpRequest) -> &'static str {
} }
fn main() { fn main() {
let sys = actix::System::new("example");
HttpServer::new( HttpServer::new(
|| Application::new() || Application::new()
.resource("/", |r| r.f(index))) .resource("/", |r| r.f(index)))
.bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088") .bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088")
.start(); .run();
println!("Started http server: 127.0.0.1:8088");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0));
let _ = sys.run();
} }
``` ```

View File

@ -47,15 +47,27 @@ address of the started http server. Actix http server accept several messages:
# extern crate actix_web; # extern crate actix_web;
# use futures::Future; # use futures::Future;
use actix_web::*; use actix_web::*;
use std::thread;
use std::sync::mpsc;
fn main() { fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let sys = actix::System::new("http-server");
let addr = HttpServer::new( let addr = HttpServer::new(
|| Application::new() || Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk))) .resource("/", |r| r.h(httpcodes::HTTPOk)))
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.spawn(); .shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds
.start();
let _ = tx.send(addr);
let _ = sys.run();
});
let _ = addr.call_fut(dev::StopServer{graceful: true}).wait(); // <- Send `StopServer` message to server. let addr = rx.recv().unwrap();
let _ = addr.call_fut(
dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
} }
``` ```
@ -173,59 +185,13 @@ timeout are force dropped. By default shutdown timeout sets to 30 seconds.
You can change this parameter with `HttpServer::shutdown_timeout()` method. You can change this parameter with `HttpServer::shutdown_timeout()` method.
You can send stop message to server with server address and specify if you what You can send stop message to server with server address and specify if you what
graceful shutdown or not. `start()` or `spawn()` methods return address of the server. graceful shutdown or not. `start()` methods return address of the server.
```rust Http server handles several OS signals. *CTRL-C* is available on all OSs,
# extern crate futures; other signals are available on unix systems.
# extern crate actix;
# extern crate actix_web;
# use futures::Future;
use actix_web::*;
fn main() {
let addr = HttpServer::new(
|| Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk)))
.bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
.shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds
.spawn();
let _ = addr.call_fut(
dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
}
```
It is possible to use signals. *CTRL-C* is available on all OSs, other signals are
available on unix systems.
Then you can subscribe your server to unix signals. Http server handles three signals:
* *SIGINT* - Force shutdown workers * *SIGINT* - Force shutdown workers
* *SIGTERM* - Graceful shutdown workers * *SIGTERM* - Graceful shutdown workers
* *SIGQUIT* - Force shutdown workers * *SIGQUIT* - Force shutdown workers
```rust,ignore It is possible to disable signals handling with `HttpServer::disable_signals()` method.
# extern crate futures;
# extern crate actix;
# extern crate actix_web;
use actix_web::*;
use actix::actors::signal::{ProcessSignals, Subscribe};
fn main() {
let sys = actix::System::new("signals");
let addr = HttpServer::new(|| {
Application::new()
.resource("/", |r| r.h(httpcodes::HTTPOk))})
.bind("127.0.0.1:8080").unwrap()
.start();
// Subscribe to unix signals
let signals = actix::Arbiter::system_registry().get::<ProcessSignals>();
signals.send(Subscribe(addr.subscriber()));
println!("Started http server: 127.0.0.1:8080");
# actix::Arbiter::system().send(actix::msgs::SystemExit(0));
let _ = sys.run();
}
```

View File

@ -6,11 +6,11 @@ use std::marker::PhantomData;
use std::collections::HashMap; use std::collections::HashMap;
use actix::prelude::*; use actix::prelude::*;
use actix::actors::signal;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use futures::sync::mpsc; use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream; use tokio_core::net::TcpStream;
use actix::actors::signal;
use mio; use mio;
use num_cpus; use num_cpus;
use net2::TcpBuilder; use net2::TcpBuilder;
@ -105,6 +105,8 @@ pub struct HttpServer<T, A, H, U>
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool, exit: bool,
shutdown_timeout: u16, shutdown_timeout: u16,
signals: Option<SyncAddress<signal::ProcessSignals>>,
no_signals: bool,
} }
unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: HttpHandler + 'static {} unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: HttpHandler + 'static {}
@ -150,6 +152,8 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
accept: Vec::new(), accept: Vec::new(),
exit: false, exit: false,
shutdown_timeout: 30, shutdown_timeout: 30,
signals: None,
no_signals: false,
} }
} }
@ -208,6 +212,18 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
self self
} }
/// Set alternative address for `ProcessSignals` actor.
pub fn signals(mut self, addr: SyncAddress<signal::ProcessSignals>) -> Self {
self.signals = Some(addr);
self
}
/// Disable signal handling
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
}
/// Timeout for graceful workers shutdown. /// Timeout for graceful workers shutdown.
/// ///
/// After receiving a stop signal, workers have this much time to finish serving requests. /// After receiving a stop signal, workers have this much time to finish serving requests.
@ -276,6 +292,18 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
info!("Starting {} http workers", self.threads); info!("Starting {} http workers", self.threads);
workers workers
} }
// subscribe to os signals
fn subscribe_to_signals(&self, addr: &SyncAddress<HttpServer<T, A, H, U>>) {
if self.no_signals {
let msg = signal::Subscribe(addr.subscriber());
if let Some(ref signals) = self.signals {
signals.send(msg);
} else {
Arbiter::system_registry().get::<signal::ProcessSignals>().send(msg);
}
}
}
} }
impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U> impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
@ -327,18 +355,21 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
} }
// start http server actor // start http server actor
HttpServer::create(|_| {self}) HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
})
} }
} }
/// Spawn new thread and start listening for incomming connections. /// Spawn new thread and start listening for incomming connections.
/// ///
/// This method spawns new thread and starts new actix system. Other than that it is /// This method spawns new thread and starts new actix system. Other than that it is
/// similar to `start()` method. This method does not block. /// similar to `start()` method. This method blocks.
/// ///
/// This methods panics if no socket addresses get bound. /// This methods panics if no socket addresses get bound.
/// ///
/// ```rust /// ```rust,ignore
/// # extern crate futures; /// # extern crate futures;
/// # extern crate actix; /// # extern crate actix;
/// # extern crate actix_web; /// # extern crate actix_web;
@ -346,27 +377,22 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
/// use actix_web::*; /// use actix_web::*;
/// ///
/// fn main() { /// fn main() {
/// let addr = HttpServer::new( /// HttpServer::new(
/// || Application::new() /// || Application::new()
/// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .resource("/", |r| r.h(httpcodes::HTTPOk)))
/// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0")
/// .spawn(); /// .run();
///
/// let _ = addr.call_fut(
/// dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server.
/// } /// }
/// ``` /// ```
pub fn spawn(mut self) -> SyncAddress<Self> { pub fn run(mut self) {
self.exit = true; self.exit = true;
self.no_signals = false;
let (tx, rx) = sync_mpsc::channel(); let _ = thread::spawn(move || {
thread::spawn(move || {
let sys = System::new("http-server"); let sys = System::new("http-server");
let addr = self.start(); self.start();
let _ = tx.send(addr); let _ = sys.run();
sys.run(); }).join();
});
rx.recv().unwrap()
} }
} }
@ -401,7 +427,10 @@ impl<H: HttpHandler, U, V> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H,
} }
// start http server actor // start http server actor
Ok(HttpServer::create(|_| {self})) Ok(HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
}))
} }
} }
} }
@ -441,7 +470,10 @@ impl<H: HttpHandler, U, V> HttpServer<SslStream<TcpStream>, net::SocketAddr, H,
} }
// start http server actor // start http server actor
Ok(HttpServer::create(|_| {self})) Ok(HttpServer::create(|ctx| {
self.subscribe_to_signals(&ctx.address());
self
}))
} }
} }
} }
@ -485,6 +517,7 @@ impl<T, A, H, U, V> HttpServer<WrapperStream<T>, A, H, U>
HttpServer::create(move |ctx| { HttpServer::create(move |ctx| {
ctx.add_stream(stream.map( ctx.add_stream(stream.map(
move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false}));
self.subscribe_to_signals(&ctx.address());
self self
}) })
} }