From 009f8e2e7caed2baf23f34f9c849df7b6aa4ab34 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 26 Nov 2019 16:33:45 +0600 Subject: [PATCH] allow to wait server exit --- actix-server/src/builder.rs | 13 +++++++++ actix-server/src/server.rs | 57 ++++++++++++++++++++++++++++++------- 2 files changed, 60 insertions(+), 10 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 67629a31..061414e0 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -5,6 +5,7 @@ use std::{io, mem, net}; use actix_rt::{spawn, time::delay, Arbiter, System}; use futures::channel::mpsc::{unbounded, UnboundedReceiver}; +use futures::channel::oneshot; use futures::future::ready; use futures::stream::FuturesUnordered; use futures::{ready, Future, FutureExt, Stream, StreamExt}; @@ -36,6 +37,7 @@ pub struct ServerBuilder { no_signals: bool, cmd: UnboundedReceiver, server: Server, + notify: Vec>, } impl Default for ServerBuilder { @@ -62,6 +64,7 @@ impl ServerBuilder { shutdown_timeout: Duration::from_secs(30), no_signals: false, cmd: rx, + notify: Vec::new(), server, } } @@ -372,6 +375,9 @@ impl ServerBuilder { // _ => (), // } // } + ServerCommand::Notify(tx) => { + self.notify.push(tx); + } ServerCommand::Stop { graceful, completion, @@ -380,6 +386,7 @@ impl ServerBuilder { // stop accept thread self.accept.send(Command::Stop); + let notify = std::mem::replace(&mut self.notify, Vec::new()); // stop workers if !self.workers.is_empty() && graceful { @@ -393,6 +400,9 @@ impl ServerBuilder { if let Some(tx) = completion { let _ = tx.send(()); } + for tx in notify { + let _ = tx.send(()); + } if exit { spawn( async { @@ -419,6 +429,9 @@ impl ServerBuilder { if let Some(tx) = completion { let _ = tx.send(()); } + for tx in notify { + let _ = tx.send(()); + } } } ServerCommand::WorkerFaulted(idx) => { diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 67f9ca5d..350a2b86 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,6 +1,10 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + use futures::channel::mpsc::UnboundedSender; use futures::channel::oneshot; -use futures::{Future, TryFutureExt}; +use futures::FutureExt; use crate::builder::ServerBuilder; // use crate::signals::Signal; @@ -16,14 +20,19 @@ pub(crate) enum ServerCommand { graceful: bool, completion: Option>, }, + /// Notify of server stop + Notify(oneshot::Sender<()>), } -#[derive(Debug, Clone)] -pub struct Server(UnboundedSender); +#[derive(Debug)] +pub struct Server( + UnboundedSender, + Option>, +); impl Server { pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx) + Server(tx, None) } /// Start server building process @@ -43,28 +52,56 @@ impl Server { /// /// If socket contains some pending connection, they might be dropped. /// All opened connection remains active. - pub fn pause(&self) -> impl Future> { + pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); - rx.map_err(|_| ()) + rx.map(|_| ()) } /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future> { + pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); - rx.map_err(|_| ()) + rx.map(|_| ()) } /// Stop incoming connection processing, stop all workers and exit. /// /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future> { + pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Stop { graceful, completion: Some(tx), }); - rx.map_err(|_| ()) + rx.map(|_| ()) + } +} + +impl Clone for Server { + fn clone(&self) -> Self { + Self(self.0.clone(), None) + } +} + +impl Future for Server { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + if this.1.is_none() { + let (tx, rx) = oneshot::channel(); + if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() { + return Poll::Ready(()); + } + this.1 = Some(rx); + } + + match Pin::new(this.1.as_mut().unwrap()).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(()), + Poll::Ready(Err(_)) => Poll::Ready(()), + } } }