From 42ec3454d922e20275c68a3c759197fb2a51b309 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 10 Dec 2018 21:06:54 -0800 Subject: [PATCH] add signals support --- actix-server/src/builder.rs | 251 +++++++++++++++++++----------------- actix-server/src/lib.rs | 1 + actix-server/src/server.rs | 12 +- actix-server/src/signals.rs | 116 +++++++++++++++++ 4 files changed, 257 insertions(+), 123 deletions(-) create mode 100644 actix-server/src/signals.rs diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 88516c5e..c16de554 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -11,15 +11,14 @@ use net2::TcpBuilder; use num_cpus; use tokio_timer::sleep; -// use actix::{actors::signal}; - -use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::config::{ConfiguredService, ServiceConfig}; -use super::server::{Server, ServerCommand}; -use super::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; -use super::services::{ServiceFactory, ServiceNewService}; -use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; -use super::Token; +use crate::accept::{AcceptLoop, AcceptNotify, Command}; +use crate::config::{ConfiguredService, ServiceConfig}; +use crate::server::{Server, ServerCommand}; +use crate::services::{InternalServiceFactory, StreamNewService, StreamServiceFactory}; +use crate::services::{ServiceFactory, ServiceNewService}; +use crate::signals::{Signal, Signals}; +use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; +use crate::Token; /// Server builder pub struct ServerBuilder { @@ -244,11 +243,12 @@ impl ServerBuilder { self.accept .start(mem::replace(&mut self.sockets, Vec::new()), workers); + // handle signals + if !self.no_signals { + Signals::start(self.server.clone()); + } + // start http server actor - // let signals = self.subscribe_to_signals(); - // if let Some(signals) = signals { - // signals.do_send(signal::Subscribe(addr.clone().recipient())) - // } let server = self.server.clone(); spawn(self); server @@ -271,36 +271,125 @@ impl ServerBuilder { worker } + + fn handle_cmd(&mut self, item: ServerCommand) { + match item { + ServerCommand::Pause(tx) => { + self.accept.send(Command::Pause); + let _ = tx.send(()); + } + ServerCommand::Resume(tx) => { + self.accept.send(Command::Resume); + let _ = tx.send(()); + } + ServerCommand::Signal(sig) => { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system + match sig { + Signal::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + Signal::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + Signal::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + _ => (), + } + } + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; + + // stop accept thread + self.accept.send(Command::Stop); + + // stop workers + if !self.workers.is_empty() { + spawn( + futures_unordered( + self.workers + .iter() + .map(move |worker| worker.1.stop(graceful)), + ) + .collect() + .then(move |_| { + if let Some(tx) = completion { + let _ = tx.send(()); + } + if exit { + spawn(sleep(Duration::from_millis(300)).then(|_| { + System::current().stop(); + ok(()) + })); + } + ok(()) + }), + ) + } else { + // we need to stop system if server was spawned + if self.exit { + spawn(sleep(Duration::from_millis(300)).then(|_| { + System::current().stop(); + ok(()) + })); + } + if let Some(tx) = completion { + let _ = tx.send(()); + } + } + } + ServerCommand::WorkerDied(idx) => { + let mut found = false; + for i in 0..self.workers.len() { + if self.workers[i].0 == idx { + self.workers.swap_remove(i); + found = true; + break; + } + } + + if found { + error!("Worker has died {:?}, restarting", idx); + + let mut new_idx = self.workers.len(); + 'found: loop { + for i in 0..self.workers.len() { + if self.workers[i].0 == new_idx { + new_idx += 1; + continue 'found; + } + } + break; + } + + let worker = self.start_worker(new_idx, self.accept.get_notify()); + self.workers.push((new_idx, worker.clone())); + self.accept.send(Command::Worker(worker)); + } + } + } + } } -// /// Signals support -// /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system -// /// message to `System` actor. -// impl Handler for Server { -// type Result = (); - -// fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { -// match msg.0 { -// signal::SignalType::Int => { -// info!("SIGINT received, exiting"); -// self.exit = true; -// Handler::::handle(self, StopServer { graceful: false }, ctx); -// } -// signal::SignalType::Term => { -// info!("SIGTERM received, stopping"); -// self.exit = true; -// Handler::::handle(self, StopServer { graceful: true }, ctx); -// } -// signal::SignalType::Quit => { -// info!("SIGQUIT received, exiting"); -// self.exit = true; -// Handler::::handle(self, StopServer { graceful: false }, ctx); -// } -// _ => (), -// } -// } -// } - impl Future for ServerBuilder { type Item = (); type Error = (); @@ -310,85 +399,7 @@ impl Future for ServerBuilder { match self.cmd.poll() { Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(Some(item))) => match item { - ServerCommand::Pause(tx) => { - self.accept.send(Command::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.send(Command::Resume); - let _ = tx.send(()); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.accept.send(Command::Stop); - - // stop workers - if !self.workers.is_empty() { - spawn( - futures_unordered( - self.workers - .iter() - .map(move |worker| worker.1.stop(graceful)), - ) - .collect() - .then(move |_| { - let _ = completion.send(()); - if exit { - spawn(sleep(Duration::from_millis(300)).then(|_| { - System::current().stop(); - ok(()) - })); - } - ok(()) - }), - ) - } else { - // we need to stop system if server was spawned - if self.exit { - spawn(sleep(Duration::from_millis(300)).then(|_| { - System::current().stop(); - ok(()) - })); - } - let _ = completion.send(()); - } - } - ServerCommand::WorkerDied(idx) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.workers.len(); - 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let worker = self.start_worker(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, worker.clone())); - self.accept.send(Command::Worker(worker)); - } - } - }, + Ok(Async::Ready(Some(item))) => self.handle_cmd(item), } } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index eb506e3c..bfcdb0d4 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,6 +6,7 @@ mod config; mod counter; mod server; mod services; +mod signals; pub mod ssl; mod worker; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index cb1ce451..4bc59992 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -2,16 +2,18 @@ use futures::sync::mpsc::UnboundedSender; use futures::sync::oneshot; use futures::Future; -use super::builder::ServerBuilder; +use crate::builder::ServerBuilder; +use crate::signals::Signal; pub(crate) enum ServerCommand { WorkerDied(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), + Signal(Signal), /// Whether to try and shut down gracefully Stop { graceful: bool, - completion: oneshot::Sender<()>, + completion: Option>, }, } @@ -28,6 +30,10 @@ impl Server { ServerBuilder::default() } + pub(crate) fn signal(&self, sig: Signal) { + let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); + } + pub(crate) fn worker_died(&self, idx: usize) { let _ = self.0.unbounded_send(ServerCommand::WorkerDied(idx)); } @@ -56,7 +62,7 @@ impl Server { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Stop { graceful, - completion: tx, + completion: Some(tx), }); rx.map_err(|_| ()) } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs new file mode 100644 index 00000000..599628a5 --- /dev/null +++ b/actix-server/src/signals.rs @@ -0,0 +1,116 @@ +use std::io; + +use actix_rt::spawn; +use futures::stream::futures_unordered; +use futures::{Async, Future, Poll, Stream}; + +use crate::server::Server; + +/// Different types of process signals +#[derive(PartialEq, Clone, Copy, Debug)] +pub(crate) enum Signal { + /// SIGHUP + Hup, + /// SIGINT + Int, + /// SIGTERM + Term, + /// SIGQUIT + Quit, +} + +pub(crate) struct Signals { + srv: Server, + #[cfg(not(unix))] + stream: SigStream, + #[cfg(unix)] + streams: Vec, +} + +type SigStream = Box>; + +impl Signals { + pub(crate) fn start(srv: Server) { + let fut = { + #[cfg(not(unix))] + { + tokio_signal::ctrl_c().and_then(move |stream| Signals { + srv, + stream: Box::new(stream.map(|_| Signal::Int)), + }) + } + + #[cfg(unix)] + { + use tokio_signal::unix; + + let mut sigs: Vec>> = + Vec::new(); + sigs.push(Box::new( + tokio_signal::unix::Signal::new(tokio_signal::unix::SIGINT).map(|stream| { + let s: SigStream = Box::new(stream.map(|_| Signal::Int)); + s + }), + )); + sigs.push(Box::new( + tokio_signal::unix::Signal::new(tokio_signal::unix::SIGHUP).map( + |stream: unix::Signal| { + let s: SigStream = Box::new(stream.map(|_| Signal::Hup)); + s + }, + ), + )); + sigs.push(Box::new( + tokio_signal::unix::Signal::new(tokio_signal::unix::SIGTERM).map( + |stream| { + let s: SigStream = Box::new(stream.map(|_| Signal::Term)); + s + }, + ), + )); + sigs.push(Box::new( + tokio_signal::unix::Signal::new(tokio_signal::unix::SIGQUIT).map( + |stream| { + let s: SigStream = Box::new(stream.map(|_| Signal::Quit)); + s + }, + ), + )); + futures_unordered(sigs) + .collect() + .map_err(|_| ()) + .and_then(move |streams| Signals { srv, streams }) + } + }; + spawn(fut); + } +} + +impl Future for Signals { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + #[cfg(not(unix))] + loop { + match self.stream.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(sig))) => self.srv.signal(sig), + Ok(Async::NotReady) => return Ok(Async::NotReady), + } + } + #[cfg(unix)] + { + for s in &mut self.streams { + loop { + match s.poll() { + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => break, + Ok(Async::Ready(Some(sig))) => self.srv.signal(sig), + } + } + } + Ok(Async::NotReady) + } + } +}