From c254bb978c83f93879e85bbe4eb80775e824f124 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 26 Nov 2019 17:03:52 +0600 Subject: [PATCH] allow to wait on Server until server stops; restore signal handling --- actix-server/CHANGES.md | 14 ++++ actix-server/Cargo.toml | 2 + actix-server/src/builder.rs | 66 +++++++++---------- actix-server/src/lib.rs | 2 +- actix-server/src/server.rs | 19 +++--- actix-server/src/signals.rs | 124 ++++++++++-------------------------- 6 files changed, 94 insertions(+), 133 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 8680373f..6e64e99d 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,5 +1,19 @@ # Changes +## [0.8.0-alpha.2] - 2019-11-xx + +### Changed + +* Allow to wait on `Server` until server stops + + +## [0.8.0-alpha.1] - 2019-11-22 + +### Changed + +* Migrate to `std::future` + + ## [0.7.0] - 2019-10-04 ### Changed diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 062dc08e..3330a431 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -40,6 +40,8 @@ futures = "0.3.1" slab = "0.4" tokio-net = { version = "0.2.0-alpha.6", features = ["signal", "tcp", "uds"] } +futures-core-preview = "0.3.0-alpha.19" + # unix domain sockets mio-uds = { version = "0.6.7" } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 061414e0..0004cdcc 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -18,7 +18,7 @@ use crate::accept::{AcceptLoop, AcceptNotify, Command}; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -// use crate::signals::{Signal, Signals}; +use crate::signals::{Signal, Signals}; use crate::socket::StdListener; use crate::worker::{self, Worker, WorkerAvailability, WorkerClient}; use crate::{ssl, Token}; @@ -305,7 +305,7 @@ impl ServerBuilder { // handle signals if !self.no_signals { - // Signals::start(self.server.clone()); + Signals::start(self.server.clone()).unwrap(); } // start http server actor @@ -344,37 +344,37 @@ impl ServerBuilder { self.accept.send(Command::Resume); let _ = tx.send(()); } - // ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - // match sig { - // Signal::Int => { - // info!("SIGINT received, exiting"); - // self.exit = true; - // self.handle_cmd(ServerCommand::Stop { - // graceful: false, - // completion: None, - // }) - // } - // Signal::Term => { - // info!("SIGTERM received, stopping"); - // self.exit = true; - // self.handle_cmd(ServerCommand::Stop { - // graceful: true, - // completion: None, - // }) - // } - // Signal::Quit => { - // info!("SIGQUIT received, exiting"); - // self.exit = true; - // self.handle_cmd(ServerCommand::Stop { - // graceful: false, - // completion: None, - // }) - // } - // _ => (), - // } - // } + ServerCommand::Signal(sig) => { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system + match sig { + Signal::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + Signal::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + Signal::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + _ => (), + } + } ServerCommand::Notify(tx) => { self.notify.push(tx); } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 6aefc010..97687c2f 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,7 +6,7 @@ mod config; mod counter; mod server; mod service; -// mod signals; +mod signals; mod socket; pub mod ssl; mod worker; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 350a2b86..8c7202f6 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,14 +8,14 @@ use futures::channel::oneshot; use futures::FutureExt; use crate::builder::ServerBuilder; -// use crate::signals::Signal; +use crate::signals::Signal; #[derive(Debug)] pub(crate) enum ServerCommand { WorkerFaulted(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), - // Signal(Signal), + Signal(Signal), /// Whether to try and shut down gracefully Stop { graceful: bool, @@ -40,9 +41,9 @@ impl Server { ServerBuilder::default() } - // pub(crate) fn signal(&self, sig: Signal) { - // let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); - // } + pub(crate) fn signal(&self, sig: Signal) { + let _ = self.0.unbounded_send(ServerCommand::Signal(sig)); + } pub(crate) fn worker_faulted(&self, idx: usize) { let _ = self.0.unbounded_send(ServerCommand::WorkerFaulted(idx)); @@ -85,7 +86,7 @@ impl Clone for Server { } impl Future for Server { - type Output = (); + type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.get_mut(); @@ -93,15 +94,15 @@ impl Future for Server { if this.1.is_none() { let (tx, rx) = oneshot::channel(); if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(()); + return Poll::Ready(Ok(())); } this.1 = Some(rx); } match Pin::new(this.1.as_mut().unwrap()).poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(()), - Poll::Ready(Err(_)) => Poll::Ready(()), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Ready(Err(_)) => Poll::Ready(Ok(())), } } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index f87cf18b..6194f2f1 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -3,11 +3,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use actix_rt::spawn; -use futures::future::LocalBoxFuture; -use futures::stream::{futures_unordered, FuturesUnordered, LocalBoxStream}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStream, TryStreamExt}; -use tokio_net::signal::unix::signal; +use futures_core::stream::Stream; use crate::server::Server; @@ -27,126 +23,74 @@ pub(crate) enum Signal { pub(crate) struct Signals { srv: Server, #[cfg(not(unix))] - stream: SigStream, + stream: tokio_net::signal::CtrlC, #[cfg(unix)] - streams: Vec, + streams: Vec<(Signal, tokio_net::signal::unix::Signal)>, } -type SigStream = LocalBoxStream<'static, Result>; - impl Signals { - pub(crate) fn start(srv: Server) { - let fut = { + pub(crate) fn start(srv: Server) -> io::Result<()> { + actix_rt::spawn({ #[cfg(not(unix))] { - tokio_net::signal::ctrl_c() - .map_err(|_| ()) - .and_then(move |stream| Signals { - srv, - stream: Box::new(stream.map(|_| Signal::Int)), - }) + let stream = tokio_net::signal::ctrl_c()?; + Signals { srv, stream } } #[cfg(unix)] { use tokio_net::signal::unix; - let mut sigs: Vec<_> = Vec::new(); + let mut streams = Vec::new(); - let mut SIG_MAP = [ - ( - tokio_net::signal::unix::SignalKind::interrupt(), - Signal::Int, - ), - (tokio_net::signal::unix::SignalKind::hangup(), Signal::Hup), - ( - tokio_net::signal::unix::SignalKind::terminate(), - Signal::Term, - ), - (tokio_net::signal::unix::SignalKind::quit(), Signal::Quit), + let sig_map = [ + (unix::SignalKind::interrupt(), Signal::Int), + (unix::SignalKind::hangup(), Signal::Hup), + (unix::SignalKind::terminate(), Signal::Term), + (unix::SignalKind::quit(), Signal::Quit), ]; - for (kind, sig) in SIG_MAP.into_iter() { + for (kind, sig) in sig_map.into_iter() { let sig = sig.clone(); - let fut = signal(*kind).unwrap(); - sigs.push(fut.map(move |_| Ok(sig)).boxed_local()); + let fut = unix::signal(*kind)?; + streams.push((sig, fut)); } - /* TODO: Finish rewriting this - sigs.push( - tokio_net::signal::unix::signal(tokio_net::signal::si).unwrap() - .map(|stream| { - let s: SigStream = Box::new(stream.map(|_| Signal::Int)); - s - }).boxed() - ); - sigs.push( - tokio_net::signal::unix::signal(tokio_net::signal::unix::SignalKind::hangup()).unwrap() - .map(|stream: unix::Signal| { - let s: SigStream = Box::new(stream.map(|_| Signal::Hup)); - s - }).boxed() - ); - sigs.push( - tokio_net::signal::unix::signal( - tokio_net::signal::unix::SignalKind::terminate() - ).unwrap() - .map(|stream| { - let s: SigStream = Box::new(stream.map(|_| Signal::Term)); - s - }).boxed(), - ); - sigs.push( - tokio_net::signal::unix::signal( - tokio_net::signal::unix::SignalKind::quit() - ).unwrap() - .map(|stream| { - let s: SigStream = Box::new(stream.map(|_| Signal::Quit)); - s - }).boxed() - ); - */ - - Signals { srv, streams: sigs } + Signals { srv, streams } } - }; - spawn(async {}); + }); + + Ok(()) } } impl Future for Signals { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unimplemented!() - } - - /* - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] loop { - match self.stream.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(sig))) => self.srv.signal(sig), - Ok(Async::NotReady) => return Ok(Async::NotReady), + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Ok(Some(_))) => self.srv.signal(Signal::Int), + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, } } #[cfg(unix)] { - for s in &mut self.streams { + for idx in 0..self.streams.len() { loop { - match s.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(sig))) => self.srv.signal(sig), + match Pin::new(&mut self.streams[idx].1).poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Pending => break, + Poll::Ready(Some(_)) => { + let sig = self.streams[idx].0; + self.srv.signal(sig); + } } } } - Ok(Async::NotReady) + Poll::Pending } } - */ }