1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-12-12 15:20:24 +01:00

add keep-alive for h2 connection

This commit is contained in:
Nikolay Kim 2017-11-04 13:24:57 -07:00
parent 28652a3ba8
commit 53868a88fa
2 changed files with 45 additions and 9 deletions

View File

@ -2,14 +2,17 @@ use std::{io, cmp, mem};
use std::rc::Rc; use std::rc::Rc;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::time::Duration;
use std::collections::VecDeque; use std::collections::VecDeque;
use actix::Arbiter;
use http::request::Parts; use http::request::Parts;
use http2::{Reason, RecvStream}; use http2::{Reason, RecvStream};
use http2::server::{Server, Handshake, Respond}; use http2::server::{Server, Handshake, Respond};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use futures::{Async, Poll, Future, Stream}; use futures::{Async, Poll, Future, Stream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Timeout;
use task::Task; use task::Task;
use channel::HttpHandler; use channel::HttpHandler;
@ -18,6 +21,8 @@ use httprequest::HttpRequest;
use payload::{Payload, PayloadError, PayloadSender}; use payload::{Payload, PayloadError, PayloadSender};
use h2writer::H2Writer; use h2writer::H2Writer;
const KEEPALIVE_PERIOD: u64 = 15; // seconds
pub(crate) struct Http2<T, A, H> pub(crate) struct Http2<T, A, H>
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static
@ -28,6 +33,7 @@ pub(crate) struct Http2<T, A, H>
state: State<IoWrapper<T>>, state: State<IoWrapper<T>>,
disconnected: bool, disconnected: bool,
tasks: VecDeque<Entry>, tasks: VecDeque<Entry>,
keepalive_timer: Option<Timeout>,
} }
enum State<T: AsyncRead + AsyncWrite> { enum State<T: AsyncRead + AsyncWrite> {
@ -47,12 +53,25 @@ impl<T, A, H> Http2<T, A, H>
disconnected: false, disconnected: false,
tasks: VecDeque::new(), tasks: VecDeque::new(),
state: State::Handshake( state: State::Handshake(
Server::handshake(IoWrapper{unread: Some(buf), inner: stream})) } Server::handshake(IoWrapper{unread: Some(buf), inner: stream})),
keepalive_timer: None,
}
} }
pub fn poll(&mut self) -> Poll<(), ()> { pub fn poll(&mut self) -> Poll<(), ()> {
// server // server
if let State::Server(ref mut server) = self.state { if let State::Server(ref mut server) = self.state {
// keep-alive timer
if let Some(ref mut timeout) = self.keepalive_timer {
match timeout.poll() {
Ok(Async::Ready(_)) =>
return Ok(Async::Ready(())),
Ok(Async::NotReady) => (),
Err(_) => unreachable!(),
}
}
loop { loop {
let mut not_ready = true; let mut not_ready = true;
@ -105,14 +124,6 @@ impl<T, A, H> Http2<T, A, H>
// get request // get request
if !self.disconnected { if !self.disconnected {
match server.poll() { match server.poll() {
Ok(Async::NotReady) => {
// Ok(Async::NotReady);
()
}
Err(err) => {
trace!("Connection error: {}", err);
self.disconnected = true;
},
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
not_ready = false; not_ready = false;
self.disconnected = true; self.disconnected = true;
@ -125,7 +136,30 @@ impl<T, A, H> Http2<T, A, H>
let (parts, body) = req.into_parts(); let (parts, body) = req.into_parts();
self.tasks.push_back( self.tasks.push_back(
Entry::new(parts, body, resp, &self.router)); Entry::new(parts, body, resp, &self.router));
self.keepalive_timer.take();
} }
Ok(Async::NotReady) => {
// start keep-alive timer
if self.tasks.is_empty() {
if self.keepalive_timer.is_none() {
trace!("Start keep-alive timer");
let mut timeout = Timeout::new(
Duration::new(KEEPALIVE_PERIOD, 0),
Arbiter::handle()).unwrap();
// register timeout
let _ = timeout.poll();
self.keepalive_timer = Some(timeout);
}
}
}
Err(err) => {
trace!("Connection error: {}", err);
self.disconnected = true;
for entry in &mut self.tasks {
entry.task.disconnected()
}
self.keepalive_timer.take();
},
} }
} }

View File

@ -12,6 +12,8 @@ use native_tls::TlsAcceptor;
#[cfg(feature="tls")] #[cfg(feature="tls")]
use tokio_tls::{TlsStream, TlsAcceptorExt}; use tokio_tls::{TlsStream, TlsAcceptorExt};
#[cfg(feature="alpn")]
use futures::Future;
#[cfg(feature="alpn")] #[cfg(feature="alpn")]
use openssl::ssl::{SslMethod, SslAcceptorBuilder}; use openssl::ssl::{SslMethod, SslAcceptorBuilder};
#[cfg(feature="alpn")] #[cfg(feature="alpn")]