1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-01-18 05:41:50 +01:00

refactor keep-alive

This commit is contained in:
Nikolay Kim 2018-04-22 15:28:04 -07:00
parent f89b7a9bb8
commit f8af3ef7f4
4 changed files with 67 additions and 51 deletions

View File

@ -37,6 +37,7 @@ pub struct HttpInnerMessage {
pub addr: Option<SocketAddr>, pub addr: Option<SocketAddr>,
pub payload: Option<Payload>, pub payload: Option<Payload>,
pub info: Option<ConnectionInfo<'static>>, pub info: Option<ConnectionInfo<'static>>,
pub keep_alive: bool,
resource: RouterResource, resource: RouterResource,
} }
@ -56,11 +57,12 @@ impl Default for HttpInnerMessage {
params: Params::new(), params: Params::new(),
query: Params::new(), query: Params::new(),
query_loaded: false, query_loaded: false,
cookies: None,
addr: None, addr: None,
cookies: None,
payload: None, payload: None,
extensions: Extensions::new(), extensions: Extensions::new(),
info: None, info: None,
keep_alive: true,
resource: RouterResource::Notset, resource: RouterResource::Notset,
} }
} }
@ -70,20 +72,7 @@ impl HttpInnerMessage {
/// Checks if a connection should be kept alive. /// Checks if a connection should be kept alive.
#[inline] #[inline]
pub fn keep_alive(&self) -> bool { pub fn keep_alive(&self) -> bool {
if let Some(conn) = self.headers.get(header::CONNECTION) { self.keep_alive
if let Ok(conn) = conn.to_str() {
if self.version == Version::HTTP_10 && conn.contains("keep-alive") {
true
} else {
self.version == Version::HTTP_11
&& !(conn.contains("close") || conn.contains("upgrade"))
}
} else {
false
}
} else {
self.version != Version::HTTP_10
}
} }
#[inline] #[inline]
@ -91,12 +80,12 @@ impl HttpInnerMessage {
self.headers.clear(); self.headers.clear();
self.extensions.clear(); self.extensions.clear();
self.params.clear(); self.params.clear();
self.query.clear();
self.query_loaded = false;
self.cookies = None;
self.addr = None; self.addr = None;
self.info = None; self.info = None;
self.query_loaded = false;
self.cookies = None;
self.payload = None; self.payload = None;
self.keep_alive = true;
self.resource = RouterResource::Notset; self.resource = RouterResource::Notset;
} }
} }
@ -126,10 +115,11 @@ impl HttpRequest<()> {
params: Params::new(), params: Params::new(),
query: Params::new(), query: Params::new(),
query_loaded: false, query_loaded: false,
extensions: Extensions::new(),
cookies: None, cookies: None,
addr: None, addr: None,
extensions: Extensions::new(),
info: None, info: None,
keep_alive: true,
resource: RouterResource::Notset, resource: RouterResource::Notset,
}), }),
None, None,
@ -377,13 +367,13 @@ impl<S> HttpRequest<S> {
/// To get client connection information `connection_info()` method should /// To get client connection information `connection_info()` method should
/// be used. /// be used.
#[inline] #[inline]
pub fn peer_addr(&self) -> Option<&SocketAddr> { pub fn peer_addr(&self) -> Option<SocketAddr> {
self.as_ref().addr.as_ref() self.as_ref().addr
} }
#[inline] #[inline]
pub(crate) fn set_peer_addr(&mut self, addr: Option<SocketAddr>) { pub(crate) fn set_peer_addr(&mut self, addr: Option<SocketAddr>) {
self.as_mut().addr = addr self.as_mut().addr = addr;
} }
/// Get a reference to the Params object. /// Get a reference to the Params object.
@ -392,6 +382,7 @@ impl<S> HttpRequest<S> {
if !self.as_ref().query_loaded { if !self.as_ref().query_loaded {
let params: &mut Params = let params: &mut Params =
unsafe { mem::transmute(&mut self.as_mut().query) }; unsafe { mem::transmute(&mut self.as_mut().query) };
params.clear();
self.as_mut().query_loaded = true; self.as_mut().query_loaded = true;
for (key, val) in form_urlencoded::parse(self.query_string().as_ref()) { for (key, val) in form_urlencoded::parse(self.query_string().as_ref()) {
params.add(key, val); params.add(key, val);
@ -425,9 +416,9 @@ impl<S> HttpRequest<S> {
} }
} }
} }
msg.cookies = Some(cookies) msg.cookies = Some(cookies);
} }
Ok(self.as_ref().cookies.as_ref().unwrap()) Ok(&self.as_ref().cookies.as_ref().unwrap())
} }
/// Return request cookie. /// Return request cookie.

View File

@ -9,7 +9,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use flate2::Compression; use flate2::Compression;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder}; use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder};
use http::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION, use http::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING,
CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use http::{HttpTryFrom, Method, Version}; use http::{HttpTryFrom, Method, Version};
@ -459,9 +459,6 @@ impl ContentEncoder {
if resp.upgrade() { if resp.upgrade() {
if version == Version::HTTP_2 { if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2"); error!("Connection upgrade is forbidden for HTTP/2");
} else {
resp.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("upgrade"));
} }
if encoding != ContentEncoding::Identity { if encoding != ContentEncoding::Identity {
encoding = ContentEncoding::Identity; encoding = ContentEncoding::Identity;

View File

@ -510,9 +510,10 @@ impl Reader {
buf: &mut BytesMut, settings: &WorkerSettings<H> buf: &mut BytesMut, settings: &WorkerSettings<H>
) -> Poll<(HttpRequest, Option<PayloadInfo>), ParseError> { ) -> Poll<(HttpRequest, Option<PayloadInfo>), ParseError> {
// Parse http message // Parse http message
let mut has_te = false;
let mut has_upgrade = false; let mut has_upgrade = false;
let mut has_length = false; let mut chunked = false;
let mut content_length = None;
let msg = { let msg = {
let bytes_ptr = buf.as_ref().as_ptr() as usize; let bytes_ptr = buf.as_ref().as_ptr() as usize;
let mut headers: [httparse::Header; MAX_HEADERS] = let mut headers: [httparse::Header; MAX_HEADERS] =
@ -546,10 +547,10 @@ impl Reader {
let msg = settings.get_http_message(); let msg = settings.get_http_message();
{ {
let msg_mut = msg.get_mut(); let msg_mut = msg.get_mut();
msg_mut.keep_alive = version != Version::HTTP_10;
for header in headers[..headers_len].iter() { for header in headers[..headers_len].iter() {
if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) { if let Ok(name) = HeaderName::from_bytes(header.name.as_bytes()) {
has_te = has_te || name == header::TRANSFER_ENCODING;
has_length = has_length || name == header::CONTENT_LENGTH;
has_upgrade = has_upgrade || name == header::UPGRADE; has_upgrade = has_upgrade || name == header::UPGRADE;
let v_start = header.value.as_ptr() as usize - bytes_ptr; let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len(); let v_end = v_start + header.value.len();
@ -558,6 +559,47 @@ impl Reader {
slice.slice(v_start, v_end), slice.slice(v_start, v_end),
) )
}; };
match name {
header::CONTENT_LENGTH => {
if let Ok(s) = value.to_str() {
if let Ok(len) = s.parse::<u64>() {
content_length = Some(len)
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
},
// transfer-encoding
header::TRANSFER_ENCODING => {
if let Ok(s) = value.to_str() {
chunked = s.to_lowercase().contains("chunked");
} else {
return Err(ParseError::Header)
}
},
// connection keep-alive state
header::CONNECTION => {
msg_mut.keep_alive = if let Ok(conn) = value.to_str() {
if version == Version::HTTP_10
&& conn.contains("keep-alive")
{
true
} else {
version == Version::HTTP_11
&& !(conn.contains("close")
|| conn.contains("upgrade"))
}
} else {
false
};
},
_ => (),
}
msg_mut.headers.append(name, value); msg_mut.headers.append(name, value);
} else { } else {
return Err(ParseError::Header); return Err(ParseError::Header);
@ -572,26 +614,12 @@ impl Reader {
}; };
// https://tools.ietf.org/html/rfc7230#section-3.3.3 // https://tools.ietf.org/html/rfc7230#section-3.3.3
let decoder = if has_te && chunked(&msg.get_mut().headers)? { let decoder = if chunked {
// Chunked encoding // Chunked encoding
Some(Decoder::chunked()) Some(Decoder::chunked())
} else if has_length { } else if let Some(len) = content_length {
// Content-Length // Content-Length
let len = msg.get_ref() Some(Decoder::length(len))
.headers
.get(header::CONTENT_LENGTH)
.unwrap();
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Some(Decoder::length(len))
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
} else {
debug!("illegal Content-Length: {:?}", len);
return Err(ParseError::Header);
}
} else if has_upgrade || msg.get_ref().method == Method::CONNECT { } else if has_upgrade || msg.get_ref().method == Method::CONNECT {
// upgrade(websocket) or connect // upgrade(websocket) or connect
Some(Decoder::eof()) Some(Decoder::eof())

View File

@ -2,8 +2,6 @@
use bytes::BufMut; use bytes::BufMut;
use futures::{Async, Poll}; use futures::{Async, Poll};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
use http::{Method, Version};
use std::rc::Rc; use std::rc::Rc;
use std::{io, mem}; use std::{io, mem};
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
@ -17,6 +15,8 @@ use body::{Binary, Body};
use header::ContentEncoding; use header::ContentEncoding;
use httprequest::HttpInnerMessage; use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use http::{Method, Version};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE};
const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific const AVERAGE_HEADER_SIZE: usize = 30; // totally scientific