mirror of
https://github.com/actix/actix-extras.git
synced 2024-11-30 18:34:36 +01:00
handle keep-alive setting more efficient
This commit is contained in:
parent
c98d320f8c
commit
b61c2a0cf0
@ -94,8 +94,8 @@ impl<T, H> Http1<T, H>
|
|||||||
|
|
||||||
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
|
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
|
||||||
// keep-alive timer
|
// keep-alive timer
|
||||||
if let Some(ref mut timeout) = self.keepalive_timer {
|
if self.keepalive_timer.is_some() {
|
||||||
match timeout.poll() {
|
match self.keepalive_timer.as_mut().unwrap().poll() {
|
||||||
Ok(Async::Ready(_)) => {
|
Ok(Async::Ready(_)) => {
|
||||||
trace!("Keep-alive timeout, close connection");
|
trace!("Keep-alive timeout, close connection");
|
||||||
return Ok(Async::Ready(Http1Result::Done))
|
return Ok(Async::Ready(Http1Result::Done))
|
||||||
@ -124,11 +124,13 @@ impl<T, H> Http1<T, H>
|
|||||||
not_ready = false;
|
not_ready = false;
|
||||||
|
|
||||||
// overide keep-alive state
|
// overide keep-alive state
|
||||||
|
if self.settings.keep_alive_enabled() {
|
||||||
if self.stream.keepalive() {
|
if self.stream.keepalive() {
|
||||||
self.flags.insert(Flags::KEEPALIVE);
|
self.flags.insert(Flags::KEEPALIVE);
|
||||||
} else {
|
} else {
|
||||||
self.flags.remove(Flags::KEEPALIVE);
|
self.flags.remove(Flags::KEEPALIVE);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
self.stream.reset();
|
self.stream.reset();
|
||||||
|
|
||||||
item.flags.insert(EntryFlags::EOF);
|
item.flags.insert(EntryFlags::EOF);
|
||||||
@ -249,7 +251,8 @@ impl<T, H> Http1<T, H>
|
|||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
// start keep-alive timer, this is also slow request timeout
|
// start keep-alive timer, this is also slow request timeout
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
if let Some(keep_alive) = self.settings.keep_alive() {
|
if self.settings.keep_alive_enabled() {
|
||||||
|
let keep_alive = self.settings.keep_alive();
|
||||||
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
|
if keep_alive > 0 && self.flags.contains(Flags::KEEPALIVE) {
|
||||||
if self.keepalive_timer.is_none() {
|
if self.keepalive_timer.is_none() {
|
||||||
trace!("Start keep-alive timer");
|
trace!("Start keep-alive timer");
|
||||||
|
@ -2,7 +2,7 @@ use std::io;
|
|||||||
use futures::{Async, Poll};
|
use futures::{Async, Poll};
|
||||||
use tokio_io::AsyncWrite;
|
use tokio_io::AsyncWrite;
|
||||||
use http::Version;
|
use http::Version;
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, DATE};
|
use http::header::{HeaderValue, CONNECTION, DATE};
|
||||||
|
|
||||||
use helpers;
|
use helpers;
|
||||||
use body::Body;
|
use body::Body;
|
||||||
@ -180,11 +180,6 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
|||||||
buffer.extend_from_slice(b"\r\n");
|
buffer.extend_from_slice(b"\r\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
// default content-type
|
|
||||||
if !msg.headers().contains_key(CONTENT_TYPE) {
|
|
||||||
buffer.extend_from_slice(b"ContentType: application/octet-stream\r\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
// msg eof
|
// msg eof
|
||||||
buffer.extend_from_slice(b"\r\n");
|
buffer.extend_from_slice(b"\r\n");
|
||||||
self.headers_size = buffer.len() as u32;
|
self.headers_size = buffer.len() as u32;
|
||||||
|
@ -155,7 +155,8 @@ impl<T, H> Http2<T, H>
|
|||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
// start keep-alive timer
|
// start keep-alive timer
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
if let Some(keep_alive) = self.settings.keep_alive() {
|
if self.settings.keep_alive_enabled() {
|
||||||
|
let keep_alive = self.settings.keep_alive();
|
||||||
if keep_alive > 0 && self.keepalive_timer.is_none() {
|
if keep_alive > 0 && self.keepalive_timer.is_none() {
|
||||||
trace!("Start keep-alive timer");
|
trace!("Start keep-alive timer");
|
||||||
let mut timeout = Timeout::new(
|
let mut timeout = Timeout::new(
|
||||||
|
@ -4,7 +4,7 @@ use futures::{Async, Poll};
|
|||||||
use http2::{Reason, SendStream};
|
use http2::{Reason, SendStream};
|
||||||
use http2::server::Respond;
|
use http2::server::Respond;
|
||||||
use http::{Version, HttpTryFrom, Response};
|
use http::{Version, HttpTryFrom, Response};
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, TRANSFER_ENCODING, DATE};
|
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE};
|
||||||
|
|
||||||
use helpers;
|
use helpers;
|
||||||
use body::Body;
|
use body::Body;
|
||||||
@ -131,12 +131,6 @@ impl Writer for H2Writer {
|
|||||||
msg.headers_mut().insert(DATE, HeaderValue::try_from(&bytes[..]).unwrap());
|
msg.headers_mut().insert(DATE, HeaderValue::try_from(&bytes[..]).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
// default content-type
|
|
||||||
if !msg.headers().contains_key(CONTENT_TYPE) {
|
|
||||||
msg.headers_mut().insert(
|
|
||||||
CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut resp = Response::new(());
|
let mut resp = Response::new(());
|
||||||
*resp.status_mut() = msg.status();
|
*resp.status_mut() = msg.status();
|
||||||
*resp.version_mut() = Version::HTTP_2;
|
*resp.version_mut() = Version::HTTP_2;
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
//! Default response headers
|
//! Default response headers
|
||||||
use http::{HeaderMap, HttpTryFrom};
|
use http::{HeaderMap, HttpTryFrom};
|
||||||
use http::header::{HeaderName, HeaderValue};
|
use http::header::{HeaderName, HeaderValue, CONTENT_TYPE};
|
||||||
|
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
@ -27,22 +27,30 @@ use middlewares::{Response, Middleware};
|
|||||||
/// .finish();
|
/// .finish();
|
||||||
/// }
|
/// }
|
||||||
/// ```
|
/// ```
|
||||||
pub struct DefaultHeaders(HeaderMap);
|
pub struct DefaultHeaders{
|
||||||
|
ct: bool,
|
||||||
|
headers: HeaderMap,
|
||||||
|
}
|
||||||
|
|
||||||
impl DefaultHeaders {
|
impl DefaultHeaders {
|
||||||
pub fn build() -> DefaultHeadersBuilder {
|
pub fn build() -> DefaultHeadersBuilder {
|
||||||
DefaultHeadersBuilder{headers: Some(HeaderMap::new())}
|
DefaultHeadersBuilder{ct: false, headers: Some(HeaderMap::new())}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> Middleware<S> for DefaultHeaders {
|
impl<S> Middleware<S> for DefaultHeaders {
|
||||||
|
|
||||||
fn response(&self, _: &mut HttpRequest<S>, mut resp: Box<HttpResponse>) -> Response {
|
fn response(&self, _: &mut HttpRequest<S>, mut resp: Box<HttpResponse>) -> Response {
|
||||||
for (key, value) in self.0.iter() {
|
for (key, value) in self.headers.iter() {
|
||||||
if !resp.headers().contains_key(key) {
|
if !resp.headers().contains_key(key) {
|
||||||
resp.headers_mut().insert(key, value.clone());
|
resp.headers_mut().insert(key, value.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// default content-type
|
||||||
|
if self.ct && !resp.headers().contains_key(CONTENT_TYPE) {
|
||||||
|
resp.headers_mut().insert(
|
||||||
|
CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"));
|
||||||
|
}
|
||||||
Response::Done(resp)
|
Response::Done(resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -50,6 +58,7 @@ impl<S> Middleware<S> for DefaultHeaders {
|
|||||||
/// Structure that follows the builder pattern for building `DefaultHeaders` middleware.
|
/// Structure that follows the builder pattern for building `DefaultHeaders` middleware.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DefaultHeadersBuilder {
|
pub struct DefaultHeadersBuilder {
|
||||||
|
ct: bool,
|
||||||
headers: Option<HeaderMap>,
|
headers: Option<HeaderMap>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,10 +85,16 @@ impl DefaultHeadersBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set *CONTENT-TYPE* header if response does not contain this header.
|
||||||
|
pub fn content_type(&mut self) -> &mut Self {
|
||||||
|
self.ct = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Finishes building and returns the built `DefaultHeaders` middleware.
|
/// Finishes building and returns the built `DefaultHeaders` middleware.
|
||||||
pub fn finish(&mut self) -> DefaultHeaders {
|
pub fn finish(&mut self) -> DefaultHeaders {
|
||||||
let headers = self.headers.take().expect("cannot reuse middleware builder");
|
let headers = self.headers.take().expect("cannot reuse middleware builder");
|
||||||
DefaultHeaders(headers)
|
DefaultHeaders{ ct: self.ct, headers: headers }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
|
|||||||
for app in &mut apps {
|
for app in &mut apps {
|
||||||
app.server_settings(settings.clone());
|
app.server_settings(settings.clone());
|
||||||
}
|
}
|
||||||
self.h = Some(Rc::new(WorkerSettings{h: apps, keep_alive: self.keep_alive}));
|
self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive)));
|
||||||
|
|
||||||
// start server
|
// start server
|
||||||
Ok(HttpServer::create(move |ctx| {
|
Ok(HttpServer::create(move |ctx| {
|
||||||
@ -411,23 +411,35 @@ struct Worker<H> {
|
|||||||
|
|
||||||
pub(crate) struct WorkerSettings<H> {
|
pub(crate) struct WorkerSettings<H> {
|
||||||
h: Vec<H>,
|
h: Vec<H>,
|
||||||
keep_alive: Option<u64>,
|
enabled: bool,
|
||||||
|
keep_alive: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<H> WorkerSettings<H> {
|
impl<H> WorkerSettings<H> {
|
||||||
|
fn new(h: Vec<H>, keep_alive: Option<u64>) -> WorkerSettings<H> {
|
||||||
|
WorkerSettings {
|
||||||
|
h: h,
|
||||||
|
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
|
||||||
|
keep_alive: keep_alive.unwrap_or(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn handlers(&self) -> &Vec<H> {
|
pub fn handlers(&self) -> &Vec<H> {
|
||||||
&self.h
|
&self.h
|
||||||
}
|
}
|
||||||
pub fn keep_alive(&self) -> Option<u64> {
|
pub fn keep_alive(&self) -> u64 {
|
||||||
self.keep_alive
|
self.keep_alive
|
||||||
}
|
}
|
||||||
|
pub fn keep_alive_enabled(&self) -> bool {
|
||||||
|
self.enabled
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<H: 'static> Worker<H> {
|
impl<H: 'static> Worker<H> {
|
||||||
|
|
||||||
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) -> Worker<H> {
|
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) -> Worker<H> {
|
||||||
Worker {
|
Worker {
|
||||||
h: Rc::new(WorkerSettings{h: h, keep_alive: keep_alive}),
|
h: Rc::new(WorkerSettings::new(h, keep_alive)),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -455,7 +467,7 @@ impl<H> Handler<IoStream<Socket>> for Worker<H>
|
|||||||
fn handle(&mut self, msg: IoStream<Socket>, _: &mut Context<Self>)
|
fn handle(&mut self, msg: IoStream<Socket>, _: &mut Context<Self>)
|
||||||
-> Response<Self, IoStream<Socket>>
|
-> Response<Self, IoStream<Socket>>
|
||||||
{
|
{
|
||||||
if self.h.keep_alive.is_none() &&
|
if !self.h.keep_alive_enabled() &&
|
||||||
msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err()
|
msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err()
|
||||||
{
|
{
|
||||||
error!("Can not set socket keep-alive option");
|
error!("Can not set socket keep-alive option");
|
||||||
|
Loading…
Reference in New Issue
Block a user