1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

shutdown io streams before exit

This commit is contained in:
Nikolay Kim 2018-01-03 22:43:44 -08:00
parent bf11bfed8e
commit 1f7aee23df
6 changed files with 169 additions and 19 deletions

View File

@ -12,4 +12,4 @@ path = "src/main.rs"
env_logger = "*" env_logger = "*"
futures = "0.1" futures = "0.1"
actix = "^0.3.5" actix = "^0.3.5"
actix-web = { git = "https://github.com/actix/actix-web.git", features=["signal"] } actix-web = { path = "../../", features=["signal"] }

View File

@ -1,9 +1,11 @@
use std::{ptr, mem, time};
use std::rc::Rc; use std::rc::Rc;
use std::net::SocketAddr; use std::net::{SocketAddr, Shutdown};
use bytes::Bytes; use bytes::Bytes;
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::net::TcpStream;
use {h1, h2}; use {h1, h2};
use error::Error; use error::Error;
@ -58,25 +60,57 @@ pub struct HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{ {
proto: Option<HttpProtocol<T, H>>, proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>,
}
impl<T, H> Drop for HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{
fn drop(&mut self) {
self.shutdown()
}
} }
impl<T, H> HttpChannel<T, H> impl<T, H> HttpChannel<T, H>
where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static
{ {
pub(crate) fn new(h: Rc<WorkerSettings<H>>, pub(crate) fn new(h: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H> io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{ {
h.add_channel(); h.add_channel();
if http2 { if http2 {
HttpChannel { HttpChannel {
node: None,
proto: Some(HttpProtocol::H2( proto: Some(HttpProtocol::H2(
h2::Http2::new(h, io, peer, Bytes::new()))) } h2::Http2::new(h, io, peer, Bytes::new()))) }
} else { } else {
HttpChannel { HttpChannel {
node: None,
proto: Some(HttpProtocol::H1( proto: Some(HttpProtocol::H1(
h1::Http1::new(h, io, peer))) } h1::Http1::new(h, io, peer))) }
} }
} }
fn io(&mut self) -> Option<&mut T> {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
Some(h1.io())
}
_ => None,
}
}
fn shutdown(&mut self) {
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
let _ = h1.io().shutdown();
}
Some(HttpProtocol::H2(ref mut h2)) => {
h2.shutdown()
}
_ => unreachable!(),
}
}
} }
/*impl<T, H> Drop for HttpChannel<T, H> /*impl<T, H> Drop for HttpChannel<T, H>
@ -94,11 +128,25 @@ impl<T, H> Future for HttpChannel<T, H>
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.node.is_none() {
self.node = Some(Node::new(self));
match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => {
h1.settings().head().insert(self.node.as_ref().unwrap());
}
Some(HttpProtocol::H2(ref mut h2)) => {
h2.settings().head().insert(self.node.as_ref().unwrap());
}
_ => unreachable!(),
}
}
match self.proto { match self.proto {
Some(HttpProtocol::H1(ref mut h1)) => { Some(HttpProtocol::H1(ref mut h1)) => {
match h1.poll() { match h1.poll() {
Ok(Async::Ready(h1::Http1Result::Done)) => { Ok(Async::Ready(h1::Http1Result::Done)) => {
h1.settings().remove_channel(); h1.settings().remove_channel();
self.node.as_ref().unwrap().remove();
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }
Ok(Async::Ready(h1::Http1Result::Switch)) => (), Ok(Async::Ready(h1::Http1Result::Switch)) => (),
@ -106,6 +154,7 @@ impl<T, H> Future for HttpChannel<T, H>
return Ok(Async::NotReady), return Ok(Async::NotReady),
Err(_) => { Err(_) => {
h1.settings().remove_channel(); h1.settings().remove_channel();
self.node.as_ref().unwrap().remove();
return Err(()) return Err(())
} }
} }
@ -113,7 +162,10 @@ impl<T, H> Future for HttpChannel<T, H>
Some(HttpProtocol::H2(ref mut h2)) => { Some(HttpProtocol::H2(ref mut h2)) => {
let result = h2.poll(); let result = h2.poll();
match result { match result {
Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(), Ok(Async::Ready(())) | Err(_) => {
h2.settings().remove_channel();
self.node.as_ref().unwrap().remove();
}
_ => (), _ => (),
} }
return result return result
@ -134,3 +186,84 @@ impl<T, H> Future for HttpChannel<T, H>
} }
} }
} }
pub(crate) struct Node<T>
{
next: Option<*mut Node<()>>,
prev: Option<*mut Node<()>>,
element: *mut T,
}
impl<T> Node<T>
{
fn new(el: &mut T) -> Self {
Node {
next: None,
prev: None,
element: el as *mut _,
}
}
fn insert<I>(&self, next: &Node<I>) {
#[allow(mutable_transmutes)]
unsafe {
if let Some(ref next2) = self.next {
let n: &mut Node<()> = mem::transmute(next2.as_ref().unwrap());
n.prev = Some(next as *const _ as *mut _);
}
let slf: &mut Node<T> = mem::transmute(self);
slf.next = Some(next as *const _ as *mut _);
let next: &mut Node<T> = mem::transmute(next);
next.prev = Some(slf as *const _ as *mut _);
}
}
fn remove(&self) {
#[allow(mutable_transmutes)]
unsafe {
if let Some(ref prev) = self.prev {
let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap());
let slf: &mut Node<T> = mem::transmute(self);
p.next = slf.next.take();
}
}
}
}
impl Node<()> {
pub(crate) fn head() -> Self {
Node {
next: None,
prev: None,
element: ptr::null_mut(),
}
}
pub(crate) fn traverse<H>(&self) where H: HttpHandler + 'static {
let mut next = self.next.as_ref();
loop {
if let Some(n) = next {
unsafe {
let n: &Node<()> = mem::transmute(n.as_ref().unwrap());
next = n.next.as_ref();
if !n.element.is_null() {
let ch: &mut HttpChannel<TcpStream, H> = mem::transmute(
&mut *(n.element as *mut _));
if let Some(io) = ch.io() {
let _ = TcpStream::set_linger(io, Some(time::Duration::new(0, 0)));
let _ = TcpStream::shutdown(io, Shutdown::Both);
continue;
}
ch.shutdown();
}
}
} else {
return
}
}
}
}

View File

@ -97,6 +97,10 @@ impl<T, H> Http1<T, H>
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
} }
pub(crate) fn io(&mut self) -> &mut T {
self.stream.get_mut()
}
fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> { fn poll_completed(&mut self, shutdown: bool) -> Result<bool, ()> {
// check stream state // check stream state
match self.stream.poll_completed(shutdown) { match self.stream.poll_completed(shutdown) {

View File

@ -64,6 +64,12 @@ impl<T, H> Http2<T, H>
} }
} }
pub(crate) fn shutdown(&mut self) {
self.state = State::Empty;
self.tasks.clear();
self.keepalive_timer.take();
}
pub fn settings(&self) -> &WorkerSettings<H> { pub fn settings(&self) -> &WorkerSettings<H> {
self.settings.as_ref() self.settings.as_ref()
} }

View File

@ -93,7 +93,7 @@ impl ServerSettings {
/// ///
/// `H` - request handler /// `H` - request handler
pub struct HttpServer<T, A, H, U> pub struct HttpServer<T, A, H, U>
where H: 'static where H: HttpHandler + 'static
{ {
h: Option<Rc<WorkerSettings<H>>>, h: Option<Rc<WorkerSettings<H>>>,
io: PhantomData<T>, io: PhantomData<T>,
@ -110,11 +110,11 @@ pub struct HttpServer<T, A, H, U>
shutdown_timeout: u16, shutdown_timeout: u16,
} }
unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: 'static {} unsafe impl<T, A, H, U> Sync for HttpServer<T, A, H, U> where H: HttpHandler + 'static {}
unsafe impl<T, A, H, U> Send for HttpServer<T, A, H, U> where H: 'static {} unsafe impl<T, A, H, U> Send for HttpServer<T, A, H, U> where H: HttpHandler + 'static {}
impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> { impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> Actor for HttpServer<T, A, H, U> {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
@ -122,7 +122,7 @@ impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> {
} }
} }
impl<T: 'static, A: 'static, H, U: 'static> HttpServer<T, A, H, U> { impl<T: 'static, A: 'static, H: HttpHandler + 'static, U: 'static> HttpServer<T, A, H, U> {
fn update_time(&self, ctx: &mut Context<Self>) { fn update_time(&self, ctx: &mut Context<Self>) {
helpers::update_date(); helpers::update_date();
ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));

View File

@ -25,7 +25,7 @@ use actix::*;
use actix::msgs::StopArbiter; use actix::msgs::StopArbiter;
use helpers; use helpers;
use channel::{HttpChannel, HttpHandler}; use channel::{HttpChannel, HttpHandler, Node};
#[derive(Message)] #[derive(Message)]
@ -50,6 +50,7 @@ pub(crate) struct WorkerSettings<H> {
bytes: Rc<helpers::SharedBytesPool>, bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>, messages: Rc<helpers::SharedMessagePool>,
channels: Cell<usize>, channels: Cell<usize>,
node: Node<()>,
} }
impl<H> WorkerSettings<H> { impl<H> WorkerSettings<H> {
@ -61,9 +62,13 @@ impl<H> WorkerSettings<H> {
bytes: Rc::new(helpers::SharedBytesPool::new()), bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()), messages: Rc::new(helpers::SharedMessagePool::new()),
channels: Cell::new(0), channels: Cell::new(0),
node: Node::head(),
} }
} }
pub fn head(&self) -> &Node<()> {
&self.node
}
pub fn handlers(&self) -> RefMut<Vec<H>> { pub fn handlers(&self) -> RefMut<Vec<H>> {
self.h.borrow_mut() self.h.borrow_mut()
} }
@ -95,19 +100,19 @@ impl<H> WorkerSettings<H> {
/// Http worker /// Http worker
/// ///
/// Worker accepts Socket objects via unbounded channel and start requests processing. /// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> { pub(crate) struct Worker<H> where H: HttpHandler + 'static {
h: Rc<WorkerSettings<H>>, settings: Rc<WorkerSettings<H>>,
hnd: Handle, hnd: Handle,
handler: StreamHandlerType, handler: StreamHandlerType,
} }
impl<H: 'static> Worker<H> { impl<H: HttpHandler + 'static> Worker<H> {
pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
-> Worker<H> -> Worker<H>
{ {
Worker { Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)), settings: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(), hnd: Arbiter::handle().clone(),
handler: handler, handler: handler,
} }
@ -122,7 +127,7 @@ impl<H: 'static> Worker<H> {
tx: oneshot::Sender<bool>, dur: time::Duration) { tx: oneshot::Sender<bool>, dur: time::Duration) {
// sleep for 1 second and then check again // sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf.h.channels.get(); let num = slf.settings.channels.get();
if num == 0 { if num == 0 {
let _ = tx.send(true); let _ = tx.send(true);
Arbiter::arbiter().send(StopArbiter(0)); Arbiter::arbiter().send(StopArbiter(0));
@ -130,6 +135,7 @@ impl<H: 'static> Worker<H> {
slf.shutdown_timeout(ctx, tx, d); slf.shutdown_timeout(ctx, tx, d);
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown http worker, {} connections", num);
slf.settings.head().traverse::<H>();
let _ = tx.send(false); let _ = tx.send(false);
Arbiter::arbiter().send(StopArbiter(0)); Arbiter::arbiter().send(StopArbiter(0));
} }
@ -137,7 +143,7 @@ impl<H: 'static> Worker<H> {
} }
} }
impl<H: 'static> Actor for Worker<H> { impl<H: 'static> Actor for Worker<H> where H: HttpHandler + 'static {
type Context = Context<Self>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
@ -154,12 +160,12 @@ impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>) fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, Conn<net::TcpStream>> -> Response<Self, Conn<net::TcpStream>>
{ {
if !self.h.keep_alive_enabled() && if !self.settings.keep_alive_enabled() &&
msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err() msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err()
{ {
error!("Can not set socket keep-alive option"); error!("Can not set socket keep-alive option");
} }
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg); self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);
Self::empty() Self::empty()
} }
} }
@ -170,7 +176,7 @@ impl<H> Handler<StopWorker> for Worker<H>
{ {
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Response<Self, StopWorker> fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Response<Self, StopWorker>
{ {
let num = self.h.channels.get(); let num = self.settings.channels.get();
if num == 0 { if num == 0 {
info!("Shutting down http worker, 0 connections"); info!("Shutting down http worker, 0 connections");
Self::reply(true) Self::reply(true)
@ -181,6 +187,7 @@ impl<H> Handler<StopWorker> for Worker<H>
Self::async_reply(rx.map_err(|_| ()).actfuture()) Self::async_reply(rx.map_err(|_| ()).actfuture())
} else { } else {
info!("Force shutdown http worker, {} connections", num); info!("Force shutdown http worker, {} connections", num);
self.settings.head().traverse::<H>();
Self::reply(false) Self::reply(false)
} }
} }