1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

allow to wait server exit

This commit is contained in:
Nikolay Kim 2019-11-26 16:33:45 +06:00
parent f5aecdee8f
commit 009f8e2e7c
2 changed files with 60 additions and 10 deletions

View File

@ -5,6 +5,7 @@ use std::{io, mem, net};
use actix_rt::{spawn, time::delay, Arbiter, System};
use futures::channel::mpsc::{unbounded, UnboundedReceiver};
use futures::channel::oneshot;
use futures::future::ready;
use futures::stream::FuturesUnordered;
use futures::{ready, Future, FutureExt, Stream, StreamExt};
@ -36,6 +37,7 @@ pub struct ServerBuilder {
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
server: Server,
notify: Vec<oneshot::Sender<()>>,
}
impl Default for ServerBuilder {
@ -62,6 +64,7 @@ impl ServerBuilder {
shutdown_timeout: Duration::from_secs(30),
no_signals: false,
cmd: rx,
notify: Vec::new(),
server,
}
}
@ -372,6 +375,9 @@ impl ServerBuilder {
// _ => (),
// }
// }
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}
ServerCommand::Stop {
graceful,
completion,
@ -380,6 +386,7 @@ impl ServerBuilder {
// stop accept thread
self.accept.send(Command::Stop);
let notify = std::mem::replace(&mut self.notify, Vec::new());
// stop workers
if !self.workers.is_empty() && graceful {
@ -393,6 +400,9 @@ impl ServerBuilder {
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
spawn(
async {
@ -419,6 +429,9 @@ impl ServerBuilder {
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
}
}
ServerCommand::WorkerFaulted(idx) => {

View File

@ -1,6 +1,10 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use futures::FutureExt;
use crate::builder::ServerBuilder;
// use crate::signals::Signal;
@ -16,14 +20,19 @@ pub(crate) enum ServerCommand {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
#[derive(Debug, Clone)]
pub struct Server(UnboundedSender<ServerCommand>);
#[derive(Debug)]
pub struct Server(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl Server {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx)
Server(tx, None)
}
/// Start server building process
@ -43,28 +52,56 @@ impl Server {
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = Result<(), ()>> {
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map_err(|_| ())
rx.map(|_| ())
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = Result<(), ()>> {
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map_err(|_| ())
rx.map(|_| ())
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = Result<(), ()>> {
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
rx.map_err(|_| ())
rx.map(|_| ())
}
}
impl Clone for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.unbounded_send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(());
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(()),
Poll::Ready(Err(_)) => Poll::Ready(()),
}
}
}