1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-01 08:45:10 +02:00

fix connection keepalive support

This commit is contained in:
Nikolay Kim
2018-10-05 10:03:10 -07:00
parent fbf67544e5
commit 2e27d77740
9 changed files with 890 additions and 105 deletions

View File

@ -6,7 +6,7 @@ use tokio_codec::{Decoder, Encoder};
use super::decoder::H1Decoder;
pub use super::decoder::InMessage;
use super::response::{ResponseInfo, ResponseLength};
use super::encoder::{ResponseEncoder, ResponseLength};
use body::Body;
use error::ParseError;
use helpers;
@ -15,6 +15,17 @@ use http::{Method, Version};
use httpresponse::HttpResponse;
use request::RequestPool;
bitflags! {
struct Flags: u8 {
const HEAD = 0b0000_0001;
const UPGRADE = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const KEEPALIVE_ENABLED = 0b0001_0000;
}
}
const AVERAGE_HEADER_SIZE: usize = 30;
/// Http response
pub enum OutMessage {
/// Http response message
@ -26,91 +37,40 @@ pub enum OutMessage {
/// HTTP/1 Codec
pub struct Codec {
decoder: H1Decoder,
encoder: H1Writer,
head: bool,
version: Version,
}
impl Codec {
/// Create HTTP/1 codec
pub fn new() -> Self {
Codec::with_pool(RequestPool::pool())
}
/// Create HTTP/1 codec with request's pool
pub(crate) fn with_pool(pool: &'static RequestPool) -> Self {
Codec {
decoder: H1Decoder::with_pool(pool),
encoder: H1Writer::new(),
head: false,
version: Version::HTTP_11,
}
}
}
impl Decoder for Codec {
type Item = InMessage;
type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let res = self.decoder.decode(src);
match res {
Ok(Some(InMessage::Message(ref req)))
| Ok(Some(InMessage::MessageWithPayload(ref req))) => {
self.head = req.inner.method == Method::HEAD;
self.version = req.inner.version;
}
_ => (),
}
res
}
}
impl Encoder for Codec {
type Item = OutMessage;
type Error = io::Error;
fn encode(
&mut self, item: Self::Item, dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
OutMessage::Response(res) => {
self.encoder.encode(res, dst, self.head, self.version)?;
}
OutMessage::Payload(bytes) => {
dst.extend_from_slice(&bytes);
}
}
Ok(())
}
}
bitflags! {
struct Flags: u8 {
const STARTED = 0b0000_0001;
const UPGRADE = 0b0000_0010;
const KEEPALIVE = 0b0000_0100;
const DISCONNECTED = 0b0000_1000;
}
}
const AVERAGE_HEADER_SIZE: usize = 30;
struct H1Writer {
// encoder part
flags: Flags,
written: u64,
headers_size: u32,
info: ResponseInfo,
te: ResponseEncoder,
}
impl H1Writer {
fn new() -> H1Writer {
H1Writer {
flags: Flags::empty(),
impl Codec {
/// Create HTTP/1 codec.
///
/// `keepalive_enabled` how response `connection` header get generated.
pub fn new(keepalive_enabled: bool) -> Self {
Codec::with_pool(RequestPool::pool(), keepalive_enabled)
}
/// Create HTTP/1 codec with request's pool
pub(crate) fn with_pool(
pool: &'static RequestPool, keepalive_enabled: bool,
) -> Self {
let flags = if keepalive_enabled {
Flags::KEEPALIVE_ENABLED
} else {
Flags::empty()
};
Codec {
decoder: H1Decoder::with_pool(pool),
version: Version::HTTP_11,
flags,
written: 0,
headers_size: 0,
info: ResponseInfo::default(),
te: ResponseEncoder::default(),
}
}
@ -118,46 +78,42 @@ impl H1Writer {
self.written
}
pub fn reset(&mut self) {
self.written = 0;
self.flags = Flags::KEEPALIVE;
}
pub fn upgrade(&self) -> bool {
self.flags.contains(Flags::UPGRADE)
}
pub fn keepalive(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE) && !self.flags.contains(Flags::UPGRADE)
self.flags.contains(Flags::KEEPALIVE)
}
fn encode(
&mut self, mut msg: HttpResponse, buffer: &mut BytesMut, head: bool,
version: Version,
fn encode_response(
&mut self, mut msg: HttpResponse, buffer: &mut BytesMut,
) -> io::Result<()> {
// prepare task
self.info.update(&mut msg, head, version);
// prepare transfer encoding
self.te
.update(&mut msg, self.flags.contains(Flags::HEAD), self.version);
//if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
//self.flags = Flags::STARTED | Flags::KEEPALIVE;
//} else {
self.flags = Flags::STARTED;
//}
let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg
.keep_alive()
.unwrap_or_else(|| self.flags.contains(Flags::KEEPALIVE));
// Connection upgrade
let version = msg.version().unwrap_or_else(|| Version::HTTP_11); //req.inner.version);
let version = msg.version().unwrap_or_else(|| self.version);
if msg.upgrade() {
self.flags.insert(Flags::UPGRADE);
self.flags.remove(Flags::KEEPALIVE);
msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("upgrade"));
}
// keep-alive
else if self.flags.contains(Flags::KEEPALIVE) {
else if ka {
self.flags.insert(Flags::KEEPALIVE);
if version < Version::HTTP_11 {
msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("keep-alive"));
}
} else if version >= Version::HTTP_11 {
self.flags.remove(Flags::KEEPALIVE);
msg.headers_mut()
.insert(CONNECTION, HeaderValue::from_static("close"));
}
@ -183,7 +139,7 @@ impl H1Writer {
buffer.extend_from_slice(reason);
// content length
match self.info.length {
match self.te.length {
ResponseLength::Chunked => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
}
@ -209,7 +165,7 @@ impl H1Writer {
for (key, value) in msg.headers() {
match *key {
TRANSFER_ENCODING => continue,
CONTENT_LENGTH => match self.info.length {
CONTENT_LENGTH => match self.te.length {
ResponseLength::None => (),
_ => continue,
},
@ -272,3 +228,46 @@ impl H1Writer {
Ok(())
}
}
impl Decoder for Codec {
type Item = InMessage;
type Error = ParseError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let res = self.decoder.decode(src);
match res {
Ok(Some(InMessage::Message(ref req)))
| Ok(Some(InMessage::MessageWithPayload(ref req))) => {
self.flags
.set(Flags::HEAD, req.inner.method == Method::HEAD);
self.version = req.inner.version;
if self.flags.contains(Flags::KEEPALIVE_ENABLED) {
self.flags.set(Flags::KEEPALIVE, req.keep_alive());
}
}
_ => (),
}
res
}
}
impl Encoder for Codec {
type Item = OutMessage;
type Error = io::Error;
fn encode(
&mut self, item: Self::Item, dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
OutMessage::Response(res) => {
self.written = 0;
self.encode_response(res, dst)?;
}
OutMessage::Payload(bytes) => {
dst.extend_from_slice(&bytes);
}
}
Ok(())
}
}

View File

@ -6,7 +6,6 @@ use std::time::Instant;
use actix_net::service::Service;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use tokio_codec::Framed;
// use tokio_current_thread::spawn;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
@ -17,6 +16,7 @@ use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use body::Body;
use config::ServiceConfig;
use error::DispatchError;
use framed::Framed;
use httpresponse::HttpResponse;
use request::Request;
@ -89,12 +89,13 @@ where
pub fn with_timeout(
stream: T, config: ServiceConfig, timeout: Option<Delay>, service: S,
) -> Self {
let flags = if config.keep_alive_enabled() {
let keepalive = config.keep_alive_enabled();
let flags = if keepalive {
Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED | Flags::FLUSHED
} else {
Flags::FLUSHED
};
let framed = Framed::new(stream, Codec::new());
let framed = Framed::new(stream, Codec::new(keepalive));
let (ka_expire, ka_timer) = if let Some(delay) = timeout {
(delay.deadline(), Some(delay))
@ -235,6 +236,10 @@ where
let msg = item.take().expect("SendResponse is empty");
match self.framed.start_send(msg) {
Ok(AsyncSink::Ready) => {
self.flags.set(
Flags::KEEPALIVE,
self.framed.get_codec().keepalive(),
);
self.flags.remove(Flags::FLUSHED);
Some(Ok(State::None))
}
@ -249,6 +254,10 @@ where
let (msg, body) = item.take().expect("SendResponse is empty");
match self.framed.start_send(msg) {
Ok(AsyncSink::Ready) => {
self.flags.set(
Flags::KEEPALIVE,
self.framed.get_codec().keepalive(),
);
self.flags.remove(Flags::FLUSHED);
Some(Ok(State::Payload(body)))
}

View File

@ -24,15 +24,15 @@ pub(crate) enum ResponseLength {
}
#[derive(Debug)]
pub(crate) struct ResponseInfo {
pub(crate) struct ResponseEncoder {
head: bool,
pub length: ResponseLength,
pub te: TransferEncoding,
}
impl Default for ResponseInfo {
impl Default for ResponseEncoder {
fn default() -> Self {
ResponseInfo {
ResponseEncoder {
head: false,
length: ResponseLength::None,
te: TransferEncoding::empty(),
@ -40,7 +40,7 @@ impl Default for ResponseInfo {
}
}
impl ResponseInfo {
impl ResponseEncoder {
pub fn update(&mut self, resp: &mut HttpResponse, head: bool, version: Version) {
self.head = head;

View File

@ -2,7 +2,7 @@
mod codec;
mod decoder;
mod dispatcher;
mod response;
mod encoder;
mod service;
pub use self::codec::{Codec, InMessage, OutMessage};