1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-30 20:04:26 +02:00

sync with latest actix

This commit is contained in:
Nikolay Kim
2018-02-12 12:17:30 -08:00
parent 30bdf9cb5e
commit 8c1b5fa945
16 changed files with 65 additions and 103 deletions

View File

@ -2,6 +2,7 @@
use std::{time, io};
use std::net::Shutdown;
use actix;
use futures::Poll;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
@ -43,11 +44,14 @@ pub struct ResumeServer;
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
#[derive(Message)]
pub struct StopServer {
pub graceful: bool
}
impl actix::Message for StopServer {
type Result = Result<(), ()>;
}
/// Low level http request handler
#[allow(unused_variables)]
pub trait HttpHandler: 'static {

View File

@ -352,7 +352,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = Actor::start(self);
signals.map(|signals| signals.send(
signal::Subscribe(addr.clone().into())));
signal::Subscribe(addr.clone().subscriber())));
Ok(addr)
}
}
@ -396,7 +396,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
let signals = self.subscribe_to_signals();
let addr: SyncAddress<_> = Actor::start(self);
signals.map(|signals| signals.send(
signal::Subscribe(addr.clone().into())));
signal::Subscribe(addr.clone().subscriber())));
Ok(addr)
}
}
@ -477,24 +477,6 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H>
}
}
impl<T, H> Handler<io::Result<Conn<T>>> for HttpServer<H>
where T: IoStream,
H: IntoHttpHandler,
{
type Result = ();
fn handle(&mut self, msg: io::Result<Conn<T>>, _: &mut Context<Self>) -> Self::Result {
match msg {
Ok(msg) =>
Arbiter::handle().spawn(
HttpChannel::new(
Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)),
Err(err) =>
debug!("Error handling request: {}", err),
}
}
}
impl<T, H> Handler<Conn<T>> for HttpServer<H>
where T: IoStream,
H: IntoHttpHandler,
@ -535,7 +517,7 @@ impl<H: IntoHttpHandler> Handler<ResumeServer> for HttpServer<H>
impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
{
type Result = actix::Response<Self, StopServer>;
type Result = actix::Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
// stop accept threads
@ -570,8 +552,8 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
}
if !self.workers.is_empty() {
Response::async_reply(
rx.into_future().map(|_| ()).map_err(|_| ()).actfuture())
Response::async(
rx.into_future().map(|_| ()).map_err(|_| ()))
} else {
// we need to stop system if server was spawned
if self.exit {

View File

@ -37,12 +37,14 @@ pub(crate) struct Conn<T> {
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
#[derive(Message)]
#[rtype(bool)]
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
@ -117,7 +119,7 @@ impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
impl<H> Handler<StopWorker> for Worker<H>
where H: HttpHandler + 'static,
{
type Result = Response<Self, StopWorker>;
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = self.settings.num_channels();
@ -128,7 +130,7 @@ impl<H> Handler<StopWorker> for Worker<H>
info!("Graceful http worker shutdown, {} connections", num);
let (tx, rx) = oneshot::channel();
self.shutdown_timeout(ctx, tx, dur);
Response::async_reply(rx.map_err(|_| ()).actfuture())
Response::async(rx.map_err(|_| ()))
} else {
info!("Force shutdown http worker, {} connections", num);
self.settings.head().traverse::<TcpStream, H>();