From 3f4898a6d18f7d3e7870bebea80e445ffd3253b8 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 28 Dec 2017 13:07:29 -0800 Subject: [PATCH] add StopWorker message --- src/server.rs | 21 ++++++++++++--------- src/worker.rs | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/server.rs b/src/server.rs index ffac04b9f..e57855943 100644 --- a/src/server.rs +++ b/src/server.rs @@ -15,28 +15,24 @@ use mio; use num_cpus; use net2::TcpBuilder; -#[cfg(feature="tls")] -use futures::{future, Future}; #[cfg(feature="tls")] use native_tls::TlsAcceptor; #[cfg(feature="tls")] -use tokio_tls::{TlsStream, TlsAcceptorExt}; +use tokio_tls::TlsStream; #[cfg(feature="alpn")] -use futures::{future, Future}; -#[cfg(feature="alpn")] -use openssl::ssl::{SslMethod, SslAcceptor, SslAcceptorBuilder}; +use openssl::ssl::{SslMethod, SslAcceptorBuilder}; #[cfg(feature="alpn")] use openssl::pkcs12::ParsedPkcs12; #[cfg(feature="alpn")] -use tokio_openssl::{SslStream, SslAcceptorExt}; +use tokio_openssl::SslStream; #[cfg(feature="signal")] use actix::actors::signal; use helpers; use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; -use worker::{Conn, Worker, WorkerSettings, StreamHandlerType}; +use worker::{Conn, Worker, WorkerSettings, StreamHandlerType, StopWorker}; /// Various server settings #[derive(Debug, Clone)] @@ -604,14 +600,21 @@ impl Handler for HttpServer U: 'static, A: 'static, { - fn handle(&mut self, _: StopServer, ctx: &mut Context) -> Response + fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Response { + // stop accept threads for item in &self.accept { let _ = item.1.send(Command::Stop); let _ = item.0.set_readiness(mio::Ready::readable()); } ctx.stop(); + // stop workers + let dur = if msg.graceful { Some(Duration::new(30, 0)) } else { None }; + for worker in &self.workers { + worker.send(StopWorker{graceful: dur}) + } + // we need to stop system if server was spawned if self.exit { Arbiter::system().send(msgs::SystemExit(0)) diff --git a/src/worker.rs b/src/worker.rs index 347d02cc6..3072ccac7 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -5,7 +5,22 @@ use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; use net2::TcpStreamExt; +#[cfg(feature="tls")] +use futures::{future, Future}; +#[cfg(feature="tls")] +use native_tls::TlsAcceptor; +#[cfg(feature="tls")] +use tokio_tls::TlsAcceptorExt; + +#[cfg(feature="alpn")] +use futures::{future, Future}; +#[cfg(feature="alpn")] +use openssl::ssl::SslAcceptor; +#[cfg(feature="alpn")] +use tokio_openssl::SslAcceptorExt; + use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler}; +use actix::msgs::StopArbiter; use helpers; use channel::{HttpChannel, HttpHandler}; @@ -18,6 +33,12 @@ pub(crate) struct Conn { pub http2: bool, } +/// Stop worker +#[derive(Message)] +pub(crate) struct StopWorker { + pub graceful: Option, +} + pub(crate) struct WorkerSettings { h: RefCell>, enabled: bool, @@ -108,6 +129,17 @@ impl Handler> for Worker } } +/// `StopWorker` message handler +impl Handler for Worker + where H: HttpHandler + 'static, +{ + fn handle(&mut self, _: StopWorker, _: &mut Context) -> Response + { + Arbiter::arbiter().send(StopArbiter(0)); + Self::empty() + } +} + #[derive(Clone)] pub(crate) enum StreamHandlerType { Normal,