diff --git a/examples/signals/Cargo.toml b/examples/signals/Cargo.toml index 9352ef5e7..984cf8e3a 100644 --- a/examples/signals/Cargo.toml +++ b/examples/signals/Cargo.toml @@ -12,4 +12,4 @@ path = "src/main.rs" env_logger = "*" futures = "0.1" actix = "^0.3.5" -actix-web = { git = "https://github.com/actix/actix-web.git", features=["signal"] } +actix-web = { path = "../../", features=["signal"] } diff --git a/src/channel.rs b/src/channel.rs index d736fd202..963ef1065 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,9 +1,11 @@ +use std::{ptr, mem, time}; use std::rc::Rc; -use std::net::SocketAddr; +use std::net::{SocketAddr, Shutdown}; use bytes::Bytes; use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::net::TcpStream; use {h1, h2}; use error::Error; @@ -58,25 +60,57 @@ pub struct HttpChannel where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { proto: Option>, + node: Option>>, +} + +impl Drop for HttpChannel + where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static +{ + fn drop(&mut self) { + self.shutdown() + } } impl HttpChannel where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { pub(crate) fn new(h: Rc>, - io: T, peer: Option, http2: bool) -> HttpChannel + io: T, peer: Option, http2: bool) -> HttpChannel { h.add_channel(); if http2 { HttpChannel { + node: None, proto: Some(HttpProtocol::H2( h2::Http2::new(h, io, peer, Bytes::new()))) } } else { HttpChannel { + node: None, proto: Some(HttpProtocol::H1( h1::Http1::new(h, io, peer))) } } } + + fn io(&mut self) -> Option<&mut T> { + match self.proto { + Some(HttpProtocol::H1(ref mut h1)) => { + Some(h1.io()) + } + _ => None, + } + } + + fn shutdown(&mut self) { + match self.proto { + Some(HttpProtocol::H1(ref mut h1)) => { + let _ = h1.io().shutdown(); + } + Some(HttpProtocol::H2(ref mut h2)) => { + h2.shutdown() + } + _ => unreachable!(), + } + } } /*impl Drop for HttpChannel @@ -94,11 +128,25 @@ impl Future for HttpChannel type Error = (); fn poll(&mut self) -> Poll { + if self.node.is_none() { + self.node = Some(Node::new(self)); + match self.proto { + Some(HttpProtocol::H1(ref mut h1)) => { + h1.settings().head().insert(self.node.as_ref().unwrap()); + } + Some(HttpProtocol::H2(ref mut h2)) => { + h2.settings().head().insert(self.node.as_ref().unwrap()); + } + _ => unreachable!(), + } + } + match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { match h1.poll() { Ok(Async::Ready(h1::Http1Result::Done)) => { h1.settings().remove_channel(); + self.node.as_ref().unwrap().remove(); return Ok(Async::Ready(())) } Ok(Async::Ready(h1::Http1Result::Switch)) => (), @@ -106,6 +154,7 @@ impl Future for HttpChannel return Ok(Async::NotReady), Err(_) => { h1.settings().remove_channel(); + self.node.as_ref().unwrap().remove(); return Err(()) } } @@ -113,7 +162,10 @@ impl Future for HttpChannel Some(HttpProtocol::H2(ref mut h2)) => { let result = h2.poll(); match result { - Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(), + Ok(Async::Ready(())) | Err(_) => { + h2.settings().remove_channel(); + self.node.as_ref().unwrap().remove(); + } _ => (), } return result @@ -134,3 +186,84 @@ impl Future for HttpChannel } } } + +pub(crate) struct Node +{ + next: Option<*mut Node<()>>, + prev: Option<*mut Node<()>>, + element: *mut T, +} + +impl Node +{ + fn new(el: &mut T) -> Self { + Node { + next: None, + prev: None, + element: el as *mut _, + } + } + + fn insert(&self, next: &Node) { + #[allow(mutable_transmutes)] + unsafe { + if let Some(ref next2) = self.next { + let n: &mut Node<()> = mem::transmute(next2.as_ref().unwrap()); + n.prev = Some(next as *const _ as *mut _); + } + let slf: &mut Node = mem::transmute(self); + slf.next = Some(next as *const _ as *mut _); + + let next: &mut Node = mem::transmute(next); + next.prev = Some(slf as *const _ as *mut _); + } + } + + fn remove(&self) { + #[allow(mutable_transmutes)] + unsafe { + if let Some(ref prev) = self.prev { + let p: &mut Node<()> = mem::transmute(prev.as_ref().unwrap()); + let slf: &mut Node = mem::transmute(self); + p.next = slf.next.take(); + } + } + } +} + + +impl Node<()> { + + pub(crate) fn head() -> Self { + Node { + next: None, + prev: None, + element: ptr::null_mut(), + } + } + + pub(crate) fn traverse(&self) where H: HttpHandler + 'static { + let mut next = self.next.as_ref(); + loop { + if let Some(n) = next { + unsafe { + let n: &Node<()> = mem::transmute(n.as_ref().unwrap()); + next = n.next.as_ref(); + + if !n.element.is_null() { + let ch: &mut HttpChannel = mem::transmute( + &mut *(n.element as *mut _)); + if let Some(io) = ch.io() { + let _ = TcpStream::set_linger(io, Some(time::Duration::new(0, 0))); + let _ = TcpStream::shutdown(io, Shutdown::Both); + continue; + } + ch.shutdown(); + } + } + } else { + return + } + } + } +} diff --git a/src/h1.rs b/src/h1.rs index e4d2930b0..e0358a159 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -97,6 +97,10 @@ impl Http1 (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) } + pub(crate) fn io(&mut self) -> &mut T { + self.stream.get_mut() + } + fn poll_completed(&mut self, shutdown: bool) -> Result { // check stream state match self.stream.poll_completed(shutdown) { diff --git a/src/h2.rs b/src/h2.rs index 89998afc9..446219727 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -64,6 +64,12 @@ impl Http2 } } + pub(crate) fn shutdown(&mut self) { + self.state = State::Empty; + self.tasks.clear(); + self.keepalive_timer.take(); + } + pub fn settings(&self) -> &WorkerSettings { self.settings.as_ref() } diff --git a/src/server.rs b/src/server.rs index 7394585ac..1833e8ae2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -93,7 +93,7 @@ impl ServerSettings { /// /// `H` - request handler pub struct HttpServer - where H: 'static + where H: HttpHandler + 'static { h: Option>>, io: PhantomData, @@ -110,11 +110,11 @@ pub struct HttpServer shutdown_timeout: u16, } -unsafe impl Sync for HttpServer where H: 'static {} -unsafe impl Send for HttpServer where H: 'static {} +unsafe impl Sync for HttpServer where H: HttpHandler + 'static {} +unsafe impl Send for HttpServer where H: HttpHandler + 'static {} -impl Actor for HttpServer { +impl Actor for HttpServer { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { @@ -122,7 +122,7 @@ impl Actor for HttpServer { } } -impl HttpServer { +impl HttpServer { fn update_time(&self, ctx: &mut Context) { helpers::update_date(); ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); diff --git a/src/worker.rs b/src/worker.rs index 29158924f..c6127d2a0 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -25,7 +25,7 @@ use actix::*; use actix::msgs::StopArbiter; use helpers; -use channel::{HttpChannel, HttpHandler}; +use channel::{HttpChannel, HttpHandler, Node}; #[derive(Message)] @@ -50,6 +50,7 @@ pub(crate) struct WorkerSettings { bytes: Rc, messages: Rc, channels: Cell, + node: Node<()>, } impl WorkerSettings { @@ -61,9 +62,13 @@ impl WorkerSettings { bytes: Rc::new(helpers::SharedBytesPool::new()), messages: Rc::new(helpers::SharedMessagePool::new()), channels: Cell::new(0), + node: Node::head(), } } + pub fn head(&self) -> &Node<()> { + &self.node + } pub fn handlers(&self) -> RefMut> { self.h.borrow_mut() } @@ -95,19 +100,19 @@ impl WorkerSettings { /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests processing. -pub(crate) struct Worker { - h: Rc>, +pub(crate) struct Worker where H: HttpHandler + 'static { + settings: Rc>, hnd: Handle, handler: StreamHandlerType, } -impl Worker { +impl Worker { pub(crate) fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) -> Worker { Worker { - h: Rc::new(WorkerSettings::new(h, keep_alive)), + settings: Rc::new(WorkerSettings::new(h, keep_alive)), hnd: Arbiter::handle().clone(), handler: handler, } @@ -122,7 +127,7 @@ impl Worker { tx: oneshot::Sender, dur: time::Duration) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = slf.h.channels.get(); + let num = slf.settings.channels.get(); if num == 0 { let _ = tx.send(true); Arbiter::arbiter().send(StopArbiter(0)); @@ -130,6 +135,7 @@ impl Worker { slf.shutdown_timeout(ctx, tx, d); } else { info!("Force shutdown http worker, {} connections", num); + slf.settings.head().traverse::(); let _ = tx.send(false); Arbiter::arbiter().send(StopArbiter(0)); } @@ -137,7 +143,7 @@ impl Worker { } } -impl Actor for Worker { +impl Actor for Worker where H: HttpHandler + 'static { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { @@ -154,12 +160,12 @@ impl Handler> for Worker fn handle(&mut self, msg: Conn, _: &mut Context) -> Response> { - if !self.h.keep_alive_enabled() && + if !self.settings.keep_alive_enabled() && msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err() { error!("Can not set socket keep-alive option"); } - self.handler.handle(Rc::clone(&self.h), &self.hnd, msg); + self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg); Self::empty() } } @@ -170,7 +176,7 @@ impl Handler for Worker { fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Response { - let num = self.h.channels.get(); + let num = self.settings.channels.get(); if num == 0 { info!("Shutting down http worker, 0 connections"); Self::reply(true) @@ -181,6 +187,7 @@ impl Handler for Worker Self::async_reply(rx.map_err(|_| ()).actfuture()) } else { info!("Force shutdown http worker, {} connections", num); + self.settings.head().traverse::(); Self::reply(false) } }