1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-25 08:52:42 +01:00
actix-web/src/server/worker.rs

219 lines
7.0 KiB
Rust
Raw Normal View History

2017-12-28 21:38:37 +01:00
use std::{net, time};
use std::rc::Rc;
2017-12-29 01:25:47 +01:00
use futures::Future;
use futures::unsync::oneshot;
2017-12-28 21:38:37 +01:00
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use net2::TcpStreamExt;
2018-02-10 20:39:12 +01:00
#[cfg(any(feature="tls", feature="alpn"))]
2017-12-29 01:25:47 +01:00
use futures::future;
2018-02-10 20:39:12 +01:00
2017-12-28 22:07:29 +01:00
#[cfg(feature="tls")]
use native_tls::TlsAcceptor;
#[cfg(feature="tls")]
use tokio_tls::TlsAcceptorExt;
#[cfg(feature="alpn")]
use openssl::ssl::SslAcceptor;
#[cfg(feature="alpn")]
use tokio_openssl::SslAcceptorExt;
2017-12-29 01:25:47 +01:00
use actix::*;
2017-12-28 22:07:29 +01:00
use actix::msgs::StopArbiter;
2017-12-28 21:38:37 +01:00
use server::{HttpHandler, KeepAlive};
2018-01-12 03:35:05 +01:00
use server::channel::HttpChannel;
use server::settings::WorkerSettings;
2017-12-28 21:38:37 +01:00
#[derive(Message)]
pub(crate) struct Conn<T> {
pub io: T,
pub peer: Option<net::SocketAddr>,
pub http2: bool,
}
2017-12-29 01:25:47 +01:00
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
2017-12-28 22:07:29 +01:00
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
2018-02-12 21:17:30 +01:00
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
2017-12-28 21:38:37 +01:00
/// Http worker
///
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate)
struct Worker<H> where H: HttpHandler + 'static {
2018-01-04 07:43:44 +01:00
settings: Rc<WorkerSettings<H>>,
2017-12-28 21:38:37 +01:00
hnd: Handle,
handler: StreamHandlerType,
tcp_ka: Option<time::Duration>,
2017-12-28 21:38:37 +01:00
}
2018-01-04 07:43:44 +01:00
impl<H: HttpHandler + 'static> Worker<H> {
2017-12-28 21:38:37 +01:00
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: KeepAlive)
2017-12-28 21:38:37 +01:00
-> Worker<H>
{
let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive {
Some(time::Duration::new(val as u64, 0))
} else {
None
};
2017-12-28 21:38:37 +01:00
Worker {
2018-01-04 07:43:44 +01:00
settings: Rc::new(WorkerSettings::new(h, keep_alive)),
2017-12-28 21:38:37 +01:00
hnd: Arbiter::handle().clone(),
2018-02-26 23:33:56 +01:00
handler,
tcp_ka,
2017-12-28 21:38:37 +01:00
}
}
fn update_time(&self, ctx: &mut Context<Self>) {
2018-03-18 19:05:44 +01:00
self.settings.update_date();
2017-12-28 21:38:37 +01:00
ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));
}
2017-12-29 01:25:47 +01:00
fn shutdown_timeout(&self, ctx: &mut Context<Self>,
tx: oneshot::Sender<bool>, dur: time::Duration) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
2018-01-12 03:35:05 +01:00
let num = slf.settings.num_channels();
2017-12-29 01:25:47 +01:00
if num == 0 {
let _ = tx.send(true);
2018-02-13 07:56:47 +01:00
Arbiter::arbiter().do_send(StopArbiter(0));
2017-12-29 01:25:47 +01:00
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.settings.head().traverse::<TcpStream, H>();
2017-12-29 01:25:47 +01:00
let _ = tx.send(false);
2018-02-13 07:56:47 +01:00
Arbiter::arbiter().do_send(StopArbiter(0));
2017-12-29 01:25:47 +01:00
}
});
}
2017-12-28 21:38:37 +01:00
}
2018-01-04 07:43:44 +01:00
impl<H: 'static> Actor for Worker<H> where H: HttpHandler + 'static {
2017-12-28 21:38:37 +01:00
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.update_time(ctx);
}
}
impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
2018-01-05 22:30:21 +01:00
type Result = ();
2017-12-28 21:38:37 +01:00
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
{
if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() {
2017-12-28 21:38:37 +01:00
error!("Can not set socket keep-alive option");
}
2018-01-04 07:43:44 +01:00
self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);
2017-12-28 21:38:37 +01:00
}
}
2017-12-28 22:07:29 +01:00
/// `StopWorker` message handler
impl<H> Handler<StopWorker> for Worker<H>
where H: HttpHandler + 'static,
{
2018-02-12 21:17:30 +01:00
type Result = Response<bool, ()>;
2018-01-05 22:30:21 +01:00
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
2018-01-12 03:35:05 +01:00
let num = self.settings.num_channels();
2017-12-29 01:25:47 +01:00
if num == 0 {
info!("Shutting down http worker, 0 connections");
2018-02-01 10:08:08 +01:00
Response::reply(Ok(true))
2017-12-29 01:25:47 +01:00
} else if let Some(dur) = msg.graceful {
info!("Graceful http worker shutdown, {} connections", num);
let (tx, rx) = oneshot::channel();
self.shutdown_timeout(ctx, tx, dur);
2018-02-12 21:17:30 +01:00
Response::async(rx.map_err(|_| ()))
2017-12-29 01:25:47 +01:00
} else {
info!("Force shutdown http worker, {} connections", num);
self.settings.head().traverse::<TcpStream, H>();
2018-02-01 10:08:08 +01:00
Response::reply(Ok(false))
2017-12-29 01:25:47 +01:00
}
2017-12-28 22:07:29 +01:00
}
}
2017-12-28 21:38:37 +01:00
#[derive(Clone)]
pub(crate) enum StreamHandlerType {
Normal,
#[cfg(feature="tls")]
Tls(TlsAcceptor),
#[cfg(feature="alpn")]
Alpn(SslAcceptor),
}
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle, msg: Conn<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
2018-01-04 18:32:47 +01:00
let _ = msg.io.set_nodelay(true);
2017-12-28 21:38:37 +01:00
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let Conn { io, peer, http2 } = msg;
2018-01-04 18:32:47 +01:00
let _ = io.set_nodelay(true);
2017-12-28 21:38:37 +01:00
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let Conn { io, peer, .. } = msg;
2018-01-04 18:32:47 +01:00
let _ = io.set_nodelay(true);
2017-12-28 21:38:37 +01:00
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
{
p.len() == 2 && &p == b"h2"
} else {
false
};
2018-03-16 16:48:44 +01:00
Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2));
2017-12-28 21:38:37 +01:00
},
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
future::result(Ok(()))
})
);
}
}
}
}