diff --git a/src/h2.rs b/src/h2.rs index 3eb2c0bf6..73235e987 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -2,14 +2,17 @@ use std::{io, cmp, mem}; use std::rc::Rc; use std::io::{Read, Write}; use std::cell::UnsafeCell; +use std::time::Duration; use std::collections::VecDeque; +use actix::Arbiter; use http::request::Parts; use http2::{Reason, RecvStream}; use http2::server::{Server, Handshake, Respond}; use bytes::{Buf, Bytes}; use futures::{Async, Poll, Future, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::reactor::Timeout; use task::Task; use channel::HttpHandler; @@ -18,6 +21,8 @@ use httprequest::HttpRequest; use payload::{Payload, PayloadError, PayloadSender}; use h2writer::H2Writer; +const KEEPALIVE_PERIOD: u64 = 15; // seconds + pub(crate) struct Http2 where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static @@ -28,6 +33,7 @@ pub(crate) struct Http2 state: State>, disconnected: bool, tasks: VecDeque, + keepalive_timer: Option, } enum State { @@ -47,12 +53,25 @@ impl Http2 disconnected: false, tasks: VecDeque::new(), state: State::Handshake( - Server::handshake(IoWrapper{unread: Some(buf), inner: stream})) } + Server::handshake(IoWrapper{unread: Some(buf), inner: stream})), + keepalive_timer: None, + } } pub fn poll(&mut self) -> Poll<(), ()> { // server if let State::Server(ref mut server) = self.state { + + // keep-alive timer + if let Some(ref mut timeout) = self.keepalive_timer { + match timeout.poll() { + Ok(Async::Ready(_)) => + return Ok(Async::Ready(())), + Ok(Async::NotReady) => (), + Err(_) => unreachable!(), + } + } + loop { let mut not_ready = true; @@ -105,14 +124,6 @@ impl Http2 // get request if !self.disconnected { match server.poll() { - Ok(Async::NotReady) => { - // Ok(Async::NotReady); - () - } - Err(err) => { - trace!("Connection error: {}", err); - self.disconnected = true; - }, Ok(Async::Ready(None)) => { not_ready = false; self.disconnected = true; @@ -125,7 +136,30 @@ impl Http2 let (parts, body) = req.into_parts(); self.tasks.push_back( Entry::new(parts, body, resp, &self.router)); + self.keepalive_timer.take(); } + Ok(Async::NotReady) => { + // start keep-alive timer + if self.tasks.is_empty() { + if self.keepalive_timer.is_none() { + trace!("Start keep-alive timer"); + let mut timeout = Timeout::new( + Duration::new(KEEPALIVE_PERIOD, 0), + Arbiter::handle()).unwrap(); + // register timeout + let _ = timeout.poll(); + self.keepalive_timer = Some(timeout); + } + } + } + Err(err) => { + trace!("Connection error: {}", err); + self.disconnected = true; + for entry in &mut self.tasks { + entry.task.disconnected() + } + self.keepalive_timer.take(); + }, } } diff --git a/src/server.rs b/src/server.rs index 6fb1f23ef..45b50848a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -12,6 +12,8 @@ use native_tls::TlsAcceptor; #[cfg(feature="tls")] use tokio_tls::{TlsStream, TlsAcceptorExt}; +#[cfg(feature="alpn")] +use futures::Future; #[cfg(feature="alpn")] use openssl::ssl::{SslMethod, SslAcceptorBuilder}; #[cfg(feature="alpn")]