1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

various optimizations

This commit is contained in:
Nikolay Kim 2018-05-15 16:41:46 -07:00
parent d6787e6c56
commit f82fa08d72
6 changed files with 161 additions and 115 deletions

View File

@ -62,10 +62,28 @@ impl Body {
} }
} }
/// Is this binary empy.
#[inline]
pub fn is_empty(&self) -> bool {
match *self {
Body::Empty => true,
_ => false,
}
}
/// Create body from slice (copy) /// Create body from slice (copy)
pub fn from_slice(s: &[u8]) -> Body { pub fn from_slice(s: &[u8]) -> Body {
Body::Binary(Binary::Bytes(Bytes::from(s))) Body::Binary(Binary::Bytes(Bytes::from(s)))
} }
/// Is this binary body.
#[inline]
pub(crate) fn binary(self) -> Binary {
match self {
Body::Binary(b) => b,
_ => panic!(),
}
}
} }
impl PartialEq for Body { impl PartialEq for Body {

View File

@ -466,10 +466,7 @@ impl HttpResponseBuilder {
jar.add(cookie.into_owned()); jar.add(cookie.into_owned());
self.cookies = Some(jar) self.cookies = Some(jar)
} else { } else {
self.cookies self.cookies.as_mut().unwrap().add(cookie.into_owned());
.as_mut()
.unwrap()
.add(cookie.into_owned());
} }
self self
} }
@ -534,9 +531,7 @@ impl HttpResponseBuilder {
if let Some(e) = self.err.take() { if let Some(e) = self.err.take() {
return Error::from(e).into(); return Error::from(e).into();
} }
let mut response = self.response let mut response = self.response.take().expect("cannot reuse response builder");
.take()
.expect("cannot reuse response builder");
if let Some(ref jar) = self.cookies { if let Some(ref jar) = self.cookies {
for cookie in jar.delta() { for cookie in jar.delta() {
match HeaderValue::from_str(&cookie.to_string()) { match HeaderValue::from_str(&cookie.to_string()) {
@ -558,9 +553,7 @@ impl HttpResponseBuilder {
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error>, E: Into<Error>,
{ {
self.body(Body::Streaming(Box::new( self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into()))))
stream.map_err(|e| e.into()),
)))
} }
/// Set a json body and generate `HttpResponse` /// Set a json body and generate `HttpResponse`
@ -607,7 +600,8 @@ impl HttpResponseBuilder {
#[inline] #[inline]
#[cfg_attr(feature = "cargo-clippy", allow(borrowed_box))] #[cfg_attr(feature = "cargo-clippy", allow(borrowed_box))]
fn parts<'a>( fn parts<'a>(
parts: &'a mut Option<Box<InnerHttpResponse>>, err: &Option<HttpError>, parts: &'a mut Option<Box<InnerHttpResponse>>,
err: &Option<HttpError>,
) -> Option<&'a mut Box<InnerHttpResponse>> { ) -> Option<&'a mut Box<InnerHttpResponse>> {
if err.is_some() { if err.is_some() {
return None; return None;
@ -822,14 +816,15 @@ thread_local!(static POOL: Rc<UnsafeCell<HttpResponsePool>> = HttpResponsePool::
impl HttpResponsePool { impl HttpResponsePool {
pub fn pool() -> Rc<UnsafeCell<HttpResponsePool>> { pub fn pool() -> Rc<UnsafeCell<HttpResponsePool>> {
Rc::new(UnsafeCell::new(HttpResponsePool( Rc::new(UnsafeCell::new(HttpResponsePool(VecDeque::with_capacity(
VecDeque::with_capacity(128), 128,
))) ))))
} }
#[inline] #[inline]
pub fn get_builder( pub fn get_builder(
pool: &Rc<UnsafeCell<HttpResponsePool>>, status: StatusCode, pool: &Rc<UnsafeCell<HttpResponsePool>>,
status: StatusCode,
) -> HttpResponseBuilder { ) -> HttpResponseBuilder {
let p = unsafe { &mut *pool.as_ref().get() }; let p = unsafe { &mut *pool.as_ref().get() };
if let Some(mut msg) = p.0.pop_front() { if let Some(mut msg) = p.0.pop_front() {
@ -853,7 +848,9 @@ impl HttpResponsePool {
#[inline] #[inline]
pub fn get_response( pub fn get_response(
pool: &Rc<UnsafeCell<HttpResponsePool>>, status: StatusCode, body: Body, pool: &Rc<UnsafeCell<HttpResponsePool>>,
status: StatusCode,
body: Body,
) -> HttpResponse { ) -> HttpResponse {
let p = unsafe { &mut *pool.as_ref().get() }; let p = unsafe { &mut *pool.as_ref().get() };
if let Some(mut msg) = p.0.pop_front() { if let Some(mut msg) = p.0.pop_front() {
@ -879,7 +876,8 @@ impl HttpResponsePool {
#[inline(always)] #[inline(always)]
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))]
fn release( fn release(
pool: &Rc<UnsafeCell<HttpResponsePool>>, mut inner: Box<InnerHttpResponse>, pool: &Rc<UnsafeCell<HttpResponsePool>>,
mut inner: Box<InnerHttpResponse>,
) { ) {
let pool = unsafe { &mut *pool.as_ref().get() }; let pool = unsafe { &mut *pool.as_ref().get() };
if pool.0.len() < 128 { if pool.0.len() < 128 {
@ -975,9 +973,7 @@ mod tests {
#[test] #[test]
fn test_force_close() { fn test_force_close() {
let resp = HttpResponse::build(StatusCode::OK) let resp = HttpResponse::build(StatusCode::OK).force_close().finish();
.force_close()
.finish();
assert!(!resp.keep_alive().unwrap()) assert!(!resp.keep_alive().unwrap())
} }
@ -986,10 +982,7 @@ mod tests {
let resp = HttpResponse::build(StatusCode::OK) let resp = HttpResponse::build(StatusCode::OK)
.content_type("text/plain") .content_type("text/plain")
.body(Body::Empty); .body(Body::Empty);
assert_eq!( assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain")
resp.headers().get(CONTENT_TYPE).unwrap(),
"text/plain"
)
} }
#[test] #[test]
@ -1036,10 +1029,10 @@ mod tests {
} }
impl Body { impl Body {
pub(crate) fn binary(&self) -> Option<&Binary> { pub(crate) fn bin_ref(&self) -> &Binary {
match *self { match *self {
Body::Binary(ref bin) => Some(bin), Body::Binary(ref bin) => bin,
_ => None, _ => panic!(),
} }
} }
} }
@ -1055,7 +1048,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().binary().unwrap(), &Binary::from("test")); assert_eq!(resp.body().bin_ref(), &Binary::from("test"));
let resp: HttpResponse = "test".respond_to(&req).ok().unwrap(); let resp: HttpResponse = "test".respond_to(&req).ok().unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1064,7 +1057,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().binary().unwrap(), &Binary::from("test")); assert_eq!(resp.body().bin_ref(), &Binary::from("test"));
let resp: HttpResponse = b"test".as_ref().into(); let resp: HttpResponse = b"test".as_ref().into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1073,10 +1066,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref()));
resp.body().binary().unwrap(),
&Binary::from(b"test".as_ref())
);
let resp: HttpResponse = b"test".as_ref().respond_to(&req).ok().unwrap(); let resp: HttpResponse = b"test".as_ref().respond_to(&req).ok().unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1085,10 +1075,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref()));
resp.body().binary().unwrap(),
&Binary::from(b"test".as_ref())
);
let resp: HttpResponse = "test".to_owned().into(); let resp: HttpResponse = "test".to_owned().into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1097,10 +1084,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned()));
resp.body().binary().unwrap(),
&Binary::from("test".to_owned())
);
let resp: HttpResponse = "test".to_owned().respond_to(&req).ok().unwrap(); let resp: HttpResponse = "test".to_owned().respond_to(&req).ok().unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1109,10 +1093,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned()));
resp.body().binary().unwrap(),
&Binary::from("test".to_owned())
);
let resp: HttpResponse = (&"test".to_owned()).into(); let resp: HttpResponse = (&"test".to_owned()).into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1121,10 +1102,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned()));
resp.body().binary().unwrap(),
&Binary::from(&"test".to_owned())
);
let resp: HttpResponse = (&"test".to_owned()).respond_to(&req).ok().unwrap(); let resp: HttpResponse = (&"test".to_owned()).respond_to(&req).ok().unwrap();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1133,10 +1111,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned()));
resp.body().binary().unwrap(),
&Binary::from(&"test".to_owned())
);
let b = Bytes::from_static(b"test"); let b = Bytes::from_static(b"test");
let resp: HttpResponse = b.into(); let resp: HttpResponse = b.into();
@ -1147,7 +1122,7 @@ mod tests {
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(
resp.body().binary().unwrap(), resp.body().bin_ref(),
&Binary::from(Bytes::from_static(b"test")) &Binary::from(Bytes::from_static(b"test"))
); );
@ -1160,7 +1135,7 @@ mod tests {
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(
resp.body().binary().unwrap(), resp.body().bin_ref(),
&Binary::from(Bytes::from_static(b"test")) &Binary::from(Bytes::from_static(b"test"))
); );
@ -1172,10 +1147,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test")));
resp.body().binary().unwrap(),
&Binary::from(BytesMut::from("test"))
);
let b = BytesMut::from("test"); let b = BytesMut::from("test");
let resp: HttpResponse = b.respond_to(&req).ok().unwrap(); let resp: HttpResponse = b.respond_to(&req).ok().unwrap();
@ -1185,10 +1157,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test")));
resp.body().binary().unwrap(),
&Binary::from(BytesMut::from("test"))
);
} }
#[test] #[test]

View File

@ -29,7 +29,9 @@ pub(crate) trait PipelineHandler<S> {
fn encoding(&self) -> ContentEncoding; fn encoding(&self) -> ContentEncoding;
fn handle( fn handle(
&mut self, req: HttpRequest<S>, htype: HandlerType, &mut self,
req: HttpRequest<S>,
htype: HandlerType,
) -> AsyncResult<HttpResponse>; ) -> AsyncResult<HttpResponse>;
} }
@ -120,8 +122,10 @@ impl<S> PipelineInfo<S> {
impl<S: 'static, H: PipelineHandler<S>> Pipeline<S, H> { impl<S: 'static, H: PipelineHandler<S>> Pipeline<S, H> {
pub fn new( pub fn new(
req: HttpRequest<S>, mws: Rc<Vec<Box<Middleware<S>>>>, req: HttpRequest<S>,
handler: Rc<UnsafeCell<H>>, htype: HandlerType, mws: Rc<Vec<Box<Middleware<S>>>>,
handler: Rc<UnsafeCell<H>>,
htype: HandlerType,
) -> Pipeline<S, H> { ) -> Pipeline<S, H> {
let mut info = PipelineInfo { let mut info = PipelineInfo {
mws, mws,
@ -148,6 +152,7 @@ impl Pipeline<(), Inner<()>> {
} }
impl<S: 'static, H> Pipeline<S, H> { impl<S: 'static, H> Pipeline<S, H> {
#[inline]
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
match self.1 { match self.1 {
PipelineState::None PipelineState::None
@ -192,7 +197,9 @@ impl<S: 'static, H: PipelineHandler<S>> HttpHandlerTask for Pipeline<S, H> {
match self.1 { match self.1 {
PipelineState::None => return Ok(Async::Ready(true)), PipelineState::None => return Ok(Async::Ready(true)),
PipelineState::Error => { PipelineState::Error => {
return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()) return Err(
io::Error::new(io::ErrorKind::Other, "Internal error").into()
)
} }
_ => (), _ => (),
} }
@ -236,7 +243,9 @@ struct StartMiddlewares<S, H> {
impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> { impl<S: 'static, H: PipelineHandler<S>> StartMiddlewares<S, H> {
fn init( fn init(
info: &mut PipelineInfo<S>, hnd: Rc<UnsafeCell<H>>, htype: HandlerType, info: &mut PipelineInfo<S>,
hnd: Rc<UnsafeCell<H>>,
htype: HandlerType,
) -> PipelineState<S, H> { ) -> PipelineState<S, H> {
// execute middlewares, we need this stage because middlewares could be // execute middlewares, we need this stage because middlewares could be
// non-async and we can move to next state immediately // non-async and we can move to next state immediately
@ -313,7 +322,8 @@ struct WaitingResponse<S, H> {
impl<S: 'static, H> WaitingResponse<S, H> { impl<S: 'static, H> WaitingResponse<S, H> {
#[inline] #[inline]
fn init( fn init(
info: &mut PipelineInfo<S>, reply: AsyncResult<HttpResponse>, info: &mut PipelineInfo<S>,
reply: AsyncResult<HttpResponse>,
) -> PipelineState<S, H> { ) -> PipelineState<S, H> {
match reply.into() { match reply.into() {
AsyncResultItem::Err(err) => RunMiddlewares::init(info, err.into()), AsyncResultItem::Err(err) => RunMiddlewares::init(info, err.into()),
@ -344,6 +354,7 @@ struct RunMiddlewares<S, H> {
} }
impl<S: 'static, H> RunMiddlewares<S, H> { impl<S: 'static, H> RunMiddlewares<S, H> {
#[inline]
fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S, H> { fn init(info: &mut PipelineInfo<S>, mut resp: HttpResponse) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
return ProcessResponse::init(resp); return ProcessResponse::init(resp);
@ -464,7 +475,9 @@ impl<S: 'static, H> ProcessResponse<S, H> {
} }
fn poll_io( fn poll_io(
mut self, io: &mut Writer, info: &mut PipelineInfo<S>, mut self,
io: &mut Writer,
info: &mut PipelineInfo<S>,
) -> Result<PipelineState<S, H>, PipelineState<S, H>> { ) -> Result<PipelineState<S, H>, PipelineState<S, H>> {
loop { loop {
if self.drain.is_none() && self.running != RunningState::Paused { if self.drain.is_none() && self.running != RunningState::Paused {
@ -676,6 +689,7 @@ struct FinishingMiddlewares<S, H> {
} }
impl<S: 'static, H> FinishingMiddlewares<S, H> { impl<S: 'static, H> FinishingMiddlewares<S, H> {
#[inline]
fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> { fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info)

View File

@ -12,8 +12,10 @@ use flate2::read::GzDecoder;
use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder}; use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder};
#[cfg(feature = "flate2")] #[cfg(feature = "flate2")]
use flate2::Compression; use flate2::Compression;
use http::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, use http::header::{
CONTENT_LENGTH, TRANSFER_ENCODING}; HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH,
TRANSFER_ENCODING,
};
use http::{HttpTryFrom, Method, Version}; use http::{HttpTryFrom, Method, Version};
use body::{Binary, Body}; use body::{Binary, Body};
@ -378,16 +380,19 @@ impl ContentEncoder {
} }
pub fn for_server( pub fn for_server(
buf: SharedBytes, req: &HttpInnerMessage, resp: &mut HttpResponse, buf: SharedBytes,
req: &HttpInnerMessage,
resp: &mut HttpResponse,
response_encoding: ContentEncoding, response_encoding: ContentEncoding,
) -> ContentEncoder { ) -> ContentEncoder {
let version = resp.version().unwrap_or_else(|| req.version); let version = resp.version().unwrap_or_else(|| req.version);
let is_head = req.method == Method::HEAD; let is_head = req.method == Method::HEAD;
let mut body = resp.replace_body(Body::Empty); let mut len = 0;
let has_body = match body { let has_body = match resp.body() {
Body::Empty => false, Body::Empty => false,
Body::Binary(ref bin) => { Body::Binary(ref bin) => {
!(response_encoding == ContentEncoding::Auto && bin.len() < 96) len = bin.len();
!(response_encoding == ContentEncoding::Auto && len < 96)
} }
_ => true, _ => true,
}; };
@ -421,14 +426,14 @@ impl ContentEncoder {
ContentEncoding::Identity ContentEncoding::Identity
}; };
let mut transfer = match body { let mut transfer = match resp.body() {
Body::Empty => { Body::Empty => {
if req.method != Method::HEAD { if req.method != Method::HEAD {
resp.headers_mut().remove(CONTENT_LENGTH); resp.headers_mut().remove(CONTENT_LENGTH);
} }
TransferEncoding::length(0, buf) TransferEncoding::length(0, buf)
} }
Body::Binary(ref mut bytes) => { Body::Binary(_) => {
if !(encoding == ContentEncoding::Identity if !(encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto) || encoding == ContentEncoding::Auto)
{ {
@ -448,19 +453,26 @@ impl ContentEncoder {
ContentEncoding::Br => { ContentEncoding::Br => {
ContentEncoder::Br(BrotliEncoder::new(transfer, 3)) ContentEncoder::Br(BrotliEncoder::new(transfer, 3))
} }
ContentEncoding::Identity => ContentEncoder::Identity(transfer), ContentEncoding::Identity | ContentEncoding::Auto => {
ContentEncoding::Auto => unreachable!(), unreachable!()
}
}; };
// TODO return error!
let _ = enc.write(bytes.clone());
let _ = enc.write_eof();
*bytes = Binary::from(tmp.take()); let bin = resp.replace_body(Body::Empty).binary();
// TODO return error!
let _ = enc.write(bin);
let _ = enc.write_eof();
let body = tmp.take();
len = body.len();
encoding = ContentEncoding::Identity; encoding = ContentEncoding::Identity;
resp.replace_body(Binary::from(body));
} }
if is_head { if is_head {
let mut b = BytesMut::new(); let mut b = BytesMut::new();
let _ = write!(b, "{}", bytes.len()); let _ = write!(b, "{}", len);
resp.headers_mut().insert( resp.headers_mut().insert(
CONTENT_LENGTH, CONTENT_LENGTH,
HeaderValue::try_from(b.freeze()).unwrap(), HeaderValue::try_from(b.freeze()).unwrap(),
@ -485,11 +497,10 @@ impl ContentEncoder {
} }
} }
}; };
// // check for head response
if is_head { if is_head {
resp.set_body(Body::Empty);
transfer.kind = TransferEncodingKind::Length(0); transfer.kind = TransferEncodingKind::Length(0);
} else {
resp.replace_body(body);
} }
match encoding { match encoding {
@ -511,7 +522,9 @@ impl ContentEncoder {
} }
fn streaming_encoding( fn streaming_encoding(
buf: SharedBytes, version: Version, resp: &mut HttpResponse, buf: SharedBytes,
version: Version,
resp: &mut HttpResponse,
) -> TransferEncoding { ) -> TransferEncoding {
match resp.chunked() { match resp.chunked() {
Some(true) => { Some(true) => {
@ -590,7 +603,7 @@ impl ContentEncoder {
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)] #[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> { pub fn write_eof(&mut self) -> Result<bool, io::Error> {
let encoder = mem::replace( let encoder = mem::replace(
self, self,
ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())), ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())),
@ -602,7 +615,7 @@ impl ContentEncoder {
Ok(mut writer) => { Ok(mut writer) => {
writer.encode_eof(); writer.encode_eof();
*self = ContentEncoder::Identity(writer); *self = ContentEncoder::Identity(writer);
Ok(()) Ok(true)
} }
Err(err) => Err(err), Err(err) => Err(err),
}, },
@ -611,7 +624,7 @@ impl ContentEncoder {
Ok(mut writer) => { Ok(mut writer) => {
writer.encode_eof(); writer.encode_eof();
*self = ContentEncoder::Identity(writer); *self = ContentEncoder::Identity(writer);
Ok(()) Ok(true)
} }
Err(err) => Err(err), Err(err) => Err(err),
}, },
@ -620,14 +633,14 @@ impl ContentEncoder {
Ok(mut writer) => { Ok(mut writer) => {
writer.encode_eof(); writer.encode_eof();
*self = ContentEncoder::Identity(writer); *self = ContentEncoder::Identity(writer);
Ok(()) Ok(true)
} }
Err(err) => Err(err), Err(err) => Err(err),
}, },
ContentEncoder::Identity(mut writer) => { ContentEncoder::Identity(mut writer) => {
writer.encode_eof(); let res = writer.encode_eof();
*self = ContentEncoder::Identity(writer); *self = ContentEncoder::Identity(writer);
Ok(()) Ok(res)
} }
} }
} }
@ -763,8 +776,7 @@ impl TransferEncoding {
return Ok(*remaining == 0); return Ok(*remaining == 0);
} }
let len = cmp::min(*remaining, msg.len() as u64); let len = cmp::min(*remaining, msg.len() as u64);
self.buffer self.buffer.extend(msg.take().split_to(len as usize).into());
.extend(msg.take().split_to(len as usize).into());
*remaining -= len as u64; *remaining -= len as u64;
Ok(*remaining == 0) Ok(*remaining == 0)
@ -777,14 +789,16 @@ impl TransferEncoding {
/// Encode eof. Return `EOF` state of encoder /// Encode eof. Return `EOF` state of encoder
#[inline] #[inline]
pub fn encode_eof(&mut self) { pub fn encode_eof(&mut self) -> bool {
match self.kind { match self.kind {
TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (), TransferEncodingKind::Eof => true,
TransferEncodingKind::Length(rem) => rem == 0,
TransferEncodingKind::Chunked(ref mut eof) => { TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof { if !*eof {
*eof = true; *eof = true;
self.buffer.extend_from_slice(b"0\r\n\r\n"); self.buffer.extend_from_slice(b"0\r\n\r\n");
} }
true
} }
} }
} }
@ -848,10 +862,7 @@ impl AcceptEncoding {
Err(_) => 0.0, Err(_) => 0.0,
}, },
}; };
Some(AcceptEncoding { Some(AcceptEncoding { encoding, quality })
encoding,
quality,
})
} }
/// Parse a raw Accept-Encoding header value into an ordered list. /// Parse a raw Accept-Encoding header value into an ordered list.
@ -879,9 +890,7 @@ mod tests {
fn test_chunked_te() { fn test_chunked_te() {
let bytes = SharedBytes::default(); let bytes = SharedBytes::default();
let mut enc = TransferEncoding::chunked(bytes.clone()); let mut enc = TransferEncoding::chunked(bytes.clone());
assert!(!enc.encode(Binary::from(b"test".as_ref())) assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap());
.ok()
.unwrap());
assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap()); assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap());
assert_eq!( assert_eq!(
bytes.get_mut().take().freeze(), bytes.get_mut().take().freeze(),

View File

@ -148,6 +148,7 @@ where
} }
#[inline] #[inline]
/// read data from stream
pub fn poll_io(&mut self) { pub fn poll_io(&mut self) {
// read io from socket // read io from socket
if !self.flags.intersects(Flags::ERROR) if !self.flags.intersects(Flags::ERROR)
@ -210,7 +211,7 @@ where
if ready { if ready {
item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED); item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED);
} else { } else {
item.flags.insert(EntryFlags::FINISHED); item.flags.insert(EntryFlags::EOF);
} }
} }
// no more IO for this iteration // no more IO for this iteration
@ -326,7 +327,36 @@ where
// search handler for request // search handler for request
for h in self.settings.handlers().iter_mut() { for h in self.settings.handlers().iter_mut() {
req = match h.handle(req) { req = match h.handle(req) {
Ok(pipe) => { Ok(mut pipe) => {
if self.tasks.is_empty() {
match pipe.poll_io(&mut self.stream) {
Ok(Async::Ready(ready)) => {
// override keep-alive state
if self.stream.keepalive() {
self.flags.insert(Flags::KEEPALIVE);
} else {
self.flags.remove(Flags::KEEPALIVE);
}
// prepare stream for next response
self.stream.reset();
if !ready {
let item = Entry {
pipe,
flags: EntryFlags::EOF,
};
self.tasks.push_back(item);
}
continue 'outer;
}
Ok(Async::NotReady) => {}
Err(err) => {
error!("Unhandled error: {}", err);
self.flags.intersects(Flags::ERROR);
return;
}
}
}
self.tasks.push_back(Entry { self.tasks.push_back(Entry {
pipe, pipe,
flags: EntryFlags::empty(), flags: EntryFlags::empty(),

View File

@ -42,7 +42,9 @@ pub(crate) struct H1Writer<T: AsyncWrite, H: 'static> {
impl<T: AsyncWrite, H: 'static> H1Writer<T, H> { impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
pub fn new( pub fn new(
stream: T, buf: SharedBytes, settings: Rc<WorkerSettings<H>>, stream: T,
buf: SharedBytes,
settings: Rc<WorkerSettings<H>>,
) -> H1Writer<T, H> { ) -> H1Writer<T, H> {
H1Writer { H1Writer {
flags: Flags::empty(), flags: Flags::empty(),
@ -101,7 +103,9 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
} }
fn start( fn start(
&mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, &mut self,
req: &mut HttpInnerMessage,
msg: &mut HttpResponse,
encoding: ContentEncoding, encoding: ContentEncoding,
) -> io::Result<WriterState> { ) -> io::Result<WriterState> {
// prepare task // prepare task
@ -138,7 +142,9 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
let reason = msg.reason().as_bytes(); let reason = msg.reason().as_bytes();
let mut is_bin = if let Body::Binary(ref bytes) = body { let mut is_bin = if let Body::Binary(ref bytes) = body {
buffer.reserve( buffer.reserve(
256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len() 256
+ msg.headers().len() * AVERAGE_HEADER_SIZE
+ bytes.len()
+ reason.len(), + reason.len(),
); );
true true
@ -255,9 +261,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
} }
fn write_eof(&mut self) -> io::Result<WriterState> { fn write_eof(&mut self) -> io::Result<WriterState> {
self.encoder.write_eof()?; if !self.encoder.write_eof()? {
if !self.encoder.is_eof() {
Err(io::Error::new( Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"Last payload item, but eof is not reached", "Last payload item, but eof is not reached",
@ -276,7 +280,9 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) }; unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) };
let written = self.write_data(buf)?; let written = self.write_data(buf)?;
let _ = self.buffer.split_to(written); let _ = self.buffer.split_to(written);
if self.buffer.len() > self.buffer_capacity { if shutdown && !self.buffer.is_empty()
|| (self.buffer.len() > self.buffer_capacity)
{
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
} }