1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 21:51:06 +01:00

allow to wait on Server until server stops; restore signal handling

This commit is contained in:
Nikolay Kim 2019-11-26 17:03:52 +06:00
parent 009f8e2e7c
commit c254bb978c
6 changed files with 94 additions and 133 deletions

View File

@ -1,5 +1,19 @@
# Changes
## [0.8.0-alpha.2] - 2019-11-xx
### Changed
* Allow to wait on `Server` until server stops
## [0.8.0-alpha.1] - 2019-11-22
### Changed
* Migrate to `std::future`
## [0.7.0] - 2019-10-04
### Changed

View File

@ -40,6 +40,8 @@ futures = "0.3.1"
slab = "0.4"
tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] }
futures-core-preview = "0.3.0-alpha.19"
# unix domain sockets
mio-uds = { version = "0.6.7" }

View File

@ -18,7 +18,7 @@ use crate::accept::{AcceptLoop, AcceptNotify, Command};
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
// use crate::signals::{Signal, Signals};
use crate::signals::{Signal, Signals};
use crate::socket::StdListener;
use crate::worker::{self, Worker, WorkerAvailability, WorkerClient};
use crate::{ssl, Token};
@ -305,7 +305,7 @@ impl ServerBuilder {
// handle signals
if !self.no_signals {
// Signals::start(self.server.clone());
Signals::start(self.server.clone()).unwrap();
}
// start http server actor
@ -344,37 +344,37 @@ impl ServerBuilder {
self.accept.send(Command::Resume);
let _ = tx.send(());
}
// ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
// match sig {
// Signal::Int => {
// info!("SIGINT received, exiting");
// self.exit = true;
// self.handle_cmd(ServerCommand::Stop {
// graceful: false,
// completion: None,
// })
// }
// Signal::Term => {
// info!("SIGTERM received, stopping");
// self.exit = true;
// self.handle_cmd(ServerCommand::Stop {
// graceful: true,
// completion: None,
// })
// }
// Signal::Quit => {
// info!("SIGQUIT received, exiting");
// self.exit = true;
// self.handle_cmd(ServerCommand::Stop {
// graceful: false,
// completion: None,
// })
// }
// _ => (),
// }
// }
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => (),
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}

View File

@ -6,7 +6,7 @@ mod config;
mod counter;
mod server;
mod service;
// mod signals;
mod signals;
mod socket;
pub mod ssl;
mod worker;

View File

@ -1,4 +1,5 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
@ -7,14 +8,14 @@ use futures::channel::oneshot;
use futures::FutureExt;
use crate::builder::ServerBuilder;
// use crate::signals::Signal;
use crate::signals::Signal;
#[derive(Debug)]
pub(crate) enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
// Signal(Signal),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
@ -40,9 +41,9 @@ impl Server {
ServerBuilder::default()
}
// pub(crate) fn signal(&self, sig: Signal) {
// let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
// }
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.unbounded_send(ServerCommand::Signal(sig));
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx));
@ -85,7 +86,7 @@ impl Clone for Server {
}
impl Future for Server {
type Output = ();
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
@ -93,15 +94,15 @@ impl Future for Server {
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(());
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(()),
Poll::Ready(Err(_)) => Poll::Ready(()),
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
}
}
}

View File

@ -3,11 +3,7 @@ use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_rt::spawn;
use futures::future::LocalBoxFuture;
use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt};
use tokio_net::signal::unix::signal;
use futures_core::stream::Stream;
use crate::server::Server;
@ -27,126 +23,74 @@ pub(crate) enum Signal {
pub(crate) struct Signals {
srv: Server,
#[cfg(not(unix))]
stream: SigStream,
stream: tokio_net::signal::CtrlC,
#[cfg(unix)]
streams: Vec<SigStream>,
streams: Vec<(Signal, tokio_net::signal::unix::Signal)>,
}
type SigStream = LocalBoxStream<'static, Result<Signal, io::Error>>;
impl Signals {
pub(crate) fn start(srv: Server) {
let fut = {
pub(crate) fn start(srv: Server) -> io::Result<()> {
actix_rt::spawn({
#[cfg(not(unix))]
{
tokio_net::signal::ctrl_c()
.map_err(|_| ())
.and_then(move |stream| Signals {
srv,
stream: Box::new(stream.map(|_| Signal::Int)),
})
let stream = tokio_net::signal::ctrl_c()?;
Signals { srv, stream }
}
#[cfg(unix)]
{
use tokio_net::signal::unix;
let mut sigs: Vec<_> = Vec::new();
let mut streams = Vec::new();
let mut SIG_MAP = [
(
tokio_net::signal::unix::SignalKind::interrupt(),
Signal::Int,
),
(tokio_net::signal::unix::SignalKind::hangup(), Signal::Hup),
(
tokio_net::signal::unix::SignalKind::terminate(),
Signal::Term,
),
(tokio_net::signal::unix::SignalKind::quit(), Signal::Quit),
let sig_map = [
(unix::SignalKind::interrupt(), Signal::Int),
(unix::SignalKind::hangup(), Signal::Hup),
(unix::SignalKind::terminate(), Signal::Term),
(unix::SignalKind::quit(), Signal::Quit),
];
for (kind, sig) in SIG_MAP.into_iter() {
for (kind, sig) in sig_map.into_iter() {
let sig = sig.clone();
let fut = signal(*kind).unwrap();
sigs.push(fut.map(move |_| Ok(sig)).boxed_local());
let fut = unix::signal(*kind)?;
streams.push((sig, fut));
}
/* TODO: Finish rewriting this
sigs.push(
tokio_net::signal::unix::signal(tokio_net::signal::si).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Int));
s
}).boxed()
);
sigs.push(
tokio_net::signal::unix::signal(tokio_net::signal::unix::SignalKind::hangup()).unwrap()
.map(|stream: unix::Signal| {
let s: SigStream = Box::new(stream.map(|_| Signal::Hup));
s
}).boxed()
);
sigs.push(
tokio_net::signal::unix::signal(
tokio_net::signal::unix::SignalKind::terminate()
).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Term));
s
}).boxed(),
);
sigs.push(
tokio_net::signal::unix::signal(
tokio_net::signal::unix::SignalKind::quit()
).unwrap()
.map(|stream| {
let s: SigStream = Box::new(stream.map(|_| Signal::Quit));
s
}).boxed()
);
*/
Signals { srv, streams: sigs }
Signals { srv, streams }
}
};
spawn(async {});
});
Ok(())
}
}
impl Future for Signals {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
/*
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
loop {
match self.stream.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(sig))) => self.srv.signal(sig),
Ok(Async::NotReady) => return Ok(Async::NotReady),
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Ok(Some(_))) => self.srv.signal(Signal::Int),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
#[cfg(unix)]
{
for s in &mut self.streams {
for idx in 0..self.streams.len() {
loop {
match s.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::NotReady) => break,
Ok(Async::Ready(Some(sig))) => self.srv.signal(sig),
match Pin::new(&mut self.streams[idx].1).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => break,
Poll::Ready(Some(_)) => {
let sig = self.streams[idx].0;
self.srv.signal(sig);
}
}
}
}
Ok(Async::NotReady)
Poll::Pending
}
}
*/
}