From f82fa08d723005d5e79c71797d7aa6df2bb4705e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 15 May 2018 16:41:46 -0700 Subject: [PATCH] various optimizations --- src/body.rs | 18 ++++++++ src/httpresponse.rs | 95 ++++++++++++++---------------------------- src/pipeline.rs | 28 +++++++++---- src/server/encoding.rs | 81 +++++++++++++++++++---------------- src/server/h1.rs | 34 ++++++++++++++- src/server/h1writer.rs | 20 +++++---- 6 files changed, 161 insertions(+), 115 deletions(-) diff --git a/src/body.rs b/src/body.rs index 063c93ac5..5ce0d1292 100644 --- a/src/body.rs +++ b/src/body.rs @@ -62,10 +62,28 @@ impl Body { } } + /// Is this binary empy. + #[inline] + pub fn is_empty(&self) -> bool { + match *self { + Body::Empty => true, + _ => false, + } + } + /// Create body from slice (copy) pub fn from_slice(s: &[u8]) -> Body { Body::Binary(Binary::Bytes(Bytes::from(s))) } + + /// Is this binary body. + #[inline] + pub(crate) fn binary(self) -> Binary { + match self { + Body::Binary(b) => b, + _ => panic!(), + } + } } impl PartialEq for Body { diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 8097fc93b..a71f53fbb 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -466,10 +466,7 @@ impl HttpResponseBuilder { jar.add(cookie.into_owned()); self.cookies = Some(jar) } else { - self.cookies - .as_mut() - .unwrap() - .add(cookie.into_owned()); + self.cookies.as_mut().unwrap().add(cookie.into_owned()); } self } @@ -534,9 +531,7 @@ impl HttpResponseBuilder { if let Some(e) = self.err.take() { return Error::from(e).into(); } - let mut response = self.response - .take() - .expect("cannot reuse response builder"); + let mut response = self.response.take().expect("cannot reuse response builder"); if let Some(ref jar) = self.cookies { for cookie in jar.delta() { match HeaderValue::from_str(&cookie.to_string()) { @@ -558,9 +553,7 @@ impl HttpResponseBuilder { S: Stream + 'static, E: Into, { - self.body(Body::Streaming(Box::new( - stream.map_err(|e| e.into()), - ))) + self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into())))) } /// Set a json body and generate `HttpResponse` @@ -607,7 +600,8 @@ impl HttpResponseBuilder { #[inline] #[cfg_attr(feature = "cargo-clippy", allow(borrowed_box))] fn parts<'a>( - parts: &'a mut Option>, err: &Option, + parts: &'a mut Option>, + err: &Option, ) -> Option<&'a mut Box> { if err.is_some() { return None; @@ -822,14 +816,15 @@ thread_local!(static POOL: Rc> = HttpResponsePool:: impl HttpResponsePool { pub fn pool() -> Rc> { - Rc::new(UnsafeCell::new(HttpResponsePool( - VecDeque::with_capacity(128), - ))) + Rc::new(UnsafeCell::new(HttpResponsePool(VecDeque::with_capacity( + 128, + )))) } #[inline] pub fn get_builder( - pool: &Rc>, status: StatusCode, + pool: &Rc>, + status: StatusCode, ) -> HttpResponseBuilder { let p = unsafe { &mut *pool.as_ref().get() }; if let Some(mut msg) = p.0.pop_front() { @@ -853,7 +848,9 @@ impl HttpResponsePool { #[inline] pub fn get_response( - pool: &Rc>, status: StatusCode, body: Body, + pool: &Rc>, + status: StatusCode, + body: Body, ) -> HttpResponse { let p = unsafe { &mut *pool.as_ref().get() }; if let Some(mut msg) = p.0.pop_front() { @@ -879,7 +876,8 @@ impl HttpResponsePool { #[inline(always)] #[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))] fn release( - pool: &Rc>, mut inner: Box, + pool: &Rc>, + mut inner: Box, ) { let pool = unsafe { &mut *pool.as_ref().get() }; if pool.0.len() < 128 { @@ -975,9 +973,7 @@ mod tests { #[test] fn test_force_close() { - let resp = HttpResponse::build(StatusCode::OK) - .force_close() - .finish(); + let resp = HttpResponse::build(StatusCode::OK).force_close().finish(); assert!(!resp.keep_alive().unwrap()) } @@ -986,10 +982,7 @@ mod tests { let resp = HttpResponse::build(StatusCode::OK) .content_type("text/plain") .body(Body::Empty); - assert_eq!( - resp.headers().get(CONTENT_TYPE).unwrap(), - "text/plain" - ) + assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain") } #[test] @@ -1036,10 +1029,10 @@ mod tests { } impl Body { - pub(crate) fn binary(&self) -> Option<&Binary> { + pub(crate) fn bin_ref(&self) -> &Binary { match *self { - Body::Binary(ref bin) => Some(bin), - _ => None, + Body::Binary(ref bin) => bin, + _ => panic!(), } } } @@ -1055,7 +1048,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().binary().unwrap(), &Binary::from("test")); + assert_eq!(resp.body().bin_ref(), &Binary::from("test")); let resp: HttpResponse = "test".respond_to(&req).ok().unwrap(); assert_eq!(resp.status(), StatusCode::OK); @@ -1064,7 +1057,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().binary().unwrap(), &Binary::from("test")); + assert_eq!(resp.body().bin_ref(), &Binary::from("test")); let resp: HttpResponse = b"test".as_ref().into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1073,10 +1066,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(b"test".as_ref()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref())); let resp: HttpResponse = b"test".as_ref().respond_to(&req).ok().unwrap(); assert_eq!(resp.status(), StatusCode::OK); @@ -1085,10 +1075,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(b"test".as_ref()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref())); let resp: HttpResponse = "test".to_owned().into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1097,10 +1084,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from("test".to_owned()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned())); let resp: HttpResponse = "test".to_owned().respond_to(&req).ok().unwrap(); assert_eq!(resp.status(), StatusCode::OK); @@ -1109,10 +1093,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from("test".to_owned()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned())); let resp: HttpResponse = (&"test".to_owned()).into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1121,10 +1102,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(&"test".to_owned()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned())); let resp: HttpResponse = (&"test".to_owned()).respond_to(&req).ok().unwrap(); assert_eq!(resp.status(), StatusCode::OK); @@ -1133,10 +1111,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(&"test".to_owned()) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned())); let b = Bytes::from_static(b"test"); let resp: HttpResponse = b.into(); @@ -1147,7 +1122,7 @@ mod tests { ); assert_eq!(resp.status(), StatusCode::OK); assert_eq!( - resp.body().binary().unwrap(), + resp.body().bin_ref(), &Binary::from(Bytes::from_static(b"test")) ); @@ -1160,7 +1135,7 @@ mod tests { ); assert_eq!(resp.status(), StatusCode::OK); assert_eq!( - resp.body().binary().unwrap(), + resp.body().bin_ref(), &Binary::from(Bytes::from_static(b"test")) ); @@ -1172,10 +1147,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(BytesMut::from("test")) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); let b = BytesMut::from("test"); let resp: HttpResponse = b.respond_to(&req).ok().unwrap(); @@ -1185,10 +1157,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().binary().unwrap(), - &Binary::from(BytesMut::from("test")) - ); + assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); } #[test] diff --git a/src/pipeline.rs b/src/pipeline.rs index e00c06178..4d5d405c0 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -29,7 +29,9 @@ pub(crate) trait PipelineHandler { fn encoding(&self) -> ContentEncoding; fn handle( - &mut self, req: HttpRequest, htype: HandlerType, + &mut self, + req: HttpRequest, + htype: HandlerType, ) -> AsyncResult; } @@ -120,8 +122,10 @@ impl PipelineInfo { impl> Pipeline { pub fn new( - req: HttpRequest, mws: Rc>>>, - handler: Rc>, htype: HandlerType, + req: HttpRequest, + mws: Rc>>>, + handler: Rc>, + htype: HandlerType, ) -> Pipeline { let mut info = PipelineInfo { mws, @@ -148,6 +152,7 @@ impl Pipeline<(), Inner<()>> { } impl Pipeline { + #[inline] fn is_done(&self) -> bool { match self.1 { PipelineState::None @@ -192,7 +197,9 @@ impl> HttpHandlerTask for Pipeline { match self.1 { PipelineState::None => return Ok(Async::Ready(true)), PipelineState::Error => { - return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()) + return Err( + io::Error::new(io::ErrorKind::Other, "Internal error").into() + ) } _ => (), } @@ -236,7 +243,9 @@ struct StartMiddlewares { impl> StartMiddlewares { fn init( - info: &mut PipelineInfo, hnd: Rc>, htype: HandlerType, + info: &mut PipelineInfo, + hnd: Rc>, + htype: HandlerType, ) -> PipelineState { // execute middlewares, we need this stage because middlewares could be // non-async and we can move to next state immediately @@ -313,7 +322,8 @@ struct WaitingResponse { impl WaitingResponse { #[inline] fn init( - info: &mut PipelineInfo, reply: AsyncResult, + info: &mut PipelineInfo, + reply: AsyncResult, ) -> PipelineState { match reply.into() { AsyncResultItem::Err(err) => RunMiddlewares::init(info, err.into()), @@ -344,6 +354,7 @@ struct RunMiddlewares { } impl RunMiddlewares { + #[inline] fn init(info: &mut PipelineInfo, mut resp: HttpResponse) -> PipelineState { if info.count == 0 { return ProcessResponse::init(resp); @@ -464,7 +475,9 @@ impl ProcessResponse { } fn poll_io( - mut self, io: &mut Writer, info: &mut PipelineInfo, + mut self, + io: &mut Writer, + info: &mut PipelineInfo, ) -> Result, PipelineState> { loop { if self.drain.is_none() && self.running != RunningState::Paused { @@ -676,6 +689,7 @@ struct FinishingMiddlewares { } impl FinishingMiddlewares { + #[inline] fn init(info: &mut PipelineInfo, resp: HttpResponse) -> PipelineState { if info.count == 0 { Completed::init(info) diff --git a/src/server/encoding.rs b/src/server/encoding.rs index ae69ae07f..662a8c4b7 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -12,8 +12,10 @@ use flate2::read::GzDecoder; use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder}; #[cfg(feature = "flate2")] use flate2::Compression; -use http::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, - CONTENT_LENGTH, TRANSFER_ENCODING}; +use http::header::{ + HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, + TRANSFER_ENCODING, +}; use http::{HttpTryFrom, Method, Version}; use body::{Binary, Body}; @@ -378,16 +380,19 @@ impl ContentEncoder { } pub fn for_server( - buf: SharedBytes, req: &HttpInnerMessage, resp: &mut HttpResponse, + buf: SharedBytes, + req: &HttpInnerMessage, + resp: &mut HttpResponse, response_encoding: ContentEncoding, ) -> ContentEncoder { let version = resp.version().unwrap_or_else(|| req.version); let is_head = req.method == Method::HEAD; - let mut body = resp.replace_body(Body::Empty); - let has_body = match body { + let mut len = 0; + let has_body = match resp.body() { Body::Empty => false, Body::Binary(ref bin) => { - !(response_encoding == ContentEncoding::Auto && bin.len() < 96) + len = bin.len(); + !(response_encoding == ContentEncoding::Auto && len < 96) } _ => true, }; @@ -421,14 +426,14 @@ impl ContentEncoder { ContentEncoding::Identity }; - let mut transfer = match body { + let mut transfer = match resp.body() { Body::Empty => { if req.method != Method::HEAD { resp.headers_mut().remove(CONTENT_LENGTH); } TransferEncoding::length(0, buf) } - Body::Binary(ref mut bytes) => { + Body::Binary(_) => { if !(encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto) { @@ -448,19 +453,26 @@ impl ContentEncoder { ContentEncoding::Br => { ContentEncoder::Br(BrotliEncoder::new(transfer, 3)) } - ContentEncoding::Identity => ContentEncoder::Identity(transfer), - ContentEncoding::Auto => unreachable!(), + ContentEncoding::Identity | ContentEncoding::Auto => { + unreachable!() + } }; - // TODO return error! - let _ = enc.write(bytes.clone()); - let _ = enc.write_eof(); - *bytes = Binary::from(tmp.take()); + let bin = resp.replace_body(Body::Empty).binary(); + + // TODO return error! + let _ = enc.write(bin); + let _ = enc.write_eof(); + let body = tmp.take(); + len = body.len(); + encoding = ContentEncoding::Identity; + resp.replace_body(Binary::from(body)); } + if is_head { let mut b = BytesMut::new(); - let _ = write!(b, "{}", bytes.len()); + let _ = write!(b, "{}", len); resp.headers_mut().insert( CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap(), @@ -485,11 +497,10 @@ impl ContentEncoder { } } }; - // + // check for head response if is_head { + resp.set_body(Body::Empty); transfer.kind = TransferEncodingKind::Length(0); - } else { - resp.replace_body(body); } match encoding { @@ -511,7 +522,9 @@ impl ContentEncoder { } fn streaming_encoding( - buf: SharedBytes, version: Version, resp: &mut HttpResponse, + buf: SharedBytes, + version: Version, + resp: &mut HttpResponse, ) -> TransferEncoding { match resp.chunked() { Some(true) => { @@ -590,7 +603,7 @@ impl ContentEncoder { #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[inline(always)] - pub fn write_eof(&mut self) -> Result<(), io::Error> { + pub fn write_eof(&mut self) -> Result { let encoder = mem::replace( self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())), @@ -602,7 +615,7 @@ impl ContentEncoder { Ok(mut writer) => { writer.encode_eof(); *self = ContentEncoder::Identity(writer); - Ok(()) + Ok(true) } Err(err) => Err(err), }, @@ -611,7 +624,7 @@ impl ContentEncoder { Ok(mut writer) => { writer.encode_eof(); *self = ContentEncoder::Identity(writer); - Ok(()) + Ok(true) } Err(err) => Err(err), }, @@ -620,14 +633,14 @@ impl ContentEncoder { Ok(mut writer) => { writer.encode_eof(); *self = ContentEncoder::Identity(writer); - Ok(()) + Ok(true) } Err(err) => Err(err), }, ContentEncoder::Identity(mut writer) => { - writer.encode_eof(); + let res = writer.encode_eof(); *self = ContentEncoder::Identity(writer); - Ok(()) + Ok(res) } } } @@ -763,8 +776,7 @@ impl TransferEncoding { return Ok(*remaining == 0); } let len = cmp::min(*remaining, msg.len() as u64); - self.buffer - .extend(msg.take().split_to(len as usize).into()); + self.buffer.extend(msg.take().split_to(len as usize).into()); *remaining -= len as u64; Ok(*remaining == 0) @@ -777,14 +789,16 @@ impl TransferEncoding { /// Encode eof. Return `EOF` state of encoder #[inline] - pub fn encode_eof(&mut self) { + pub fn encode_eof(&mut self) -> bool { match self.kind { - TransferEncodingKind::Eof | TransferEncodingKind::Length(_) => (), + TransferEncodingKind::Eof => true, + TransferEncodingKind::Length(rem) => rem == 0, TransferEncodingKind::Chunked(ref mut eof) => { if !*eof { *eof = true; self.buffer.extend_from_slice(b"0\r\n\r\n"); } + true } } } @@ -848,10 +862,7 @@ impl AcceptEncoding { Err(_) => 0.0, }, }; - Some(AcceptEncoding { - encoding, - quality, - }) + Some(AcceptEncoding { encoding, quality }) } /// Parse a raw Accept-Encoding header value into an ordered list. @@ -879,9 +890,7 @@ mod tests { fn test_chunked_te() { let bytes = SharedBytes::default(); let mut enc = TransferEncoding::chunked(bytes.clone()); - assert!(!enc.encode(Binary::from(b"test".as_ref())) - .ok() - .unwrap()); + assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap()); assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap()); assert_eq!( bytes.get_mut().take().freeze(), diff --git a/src/server/h1.rs b/src/server/h1.rs index 933ce0a80..9418616cd 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -148,6 +148,7 @@ where } #[inline] + /// read data from stream pub fn poll_io(&mut self) { // read io from socket if !self.flags.intersects(Flags::ERROR) @@ -210,7 +211,7 @@ where if ready { item.flags.insert(EntryFlags::EOF | EntryFlags::FINISHED); } else { - item.flags.insert(EntryFlags::FINISHED); + item.flags.insert(EntryFlags::EOF); } } // no more IO for this iteration @@ -326,7 +327,36 @@ where // search handler for request for h in self.settings.handlers().iter_mut() { req = match h.handle(req) { - Ok(pipe) => { + Ok(mut pipe) => { + if self.tasks.is_empty() { + match pipe.poll_io(&mut self.stream) { + Ok(Async::Ready(ready)) => { + // override keep-alive state + if self.stream.keepalive() { + self.flags.insert(Flags::KEEPALIVE); + } else { + self.flags.remove(Flags::KEEPALIVE); + } + // prepare stream for next response + self.stream.reset(); + + if !ready { + let item = Entry { + pipe, + flags: EntryFlags::EOF, + }; + self.tasks.push_back(item); + } + continue 'outer; + } + Ok(Async::NotReady) => {} + Err(err) => { + error!("Unhandled error: {}", err); + self.flags.intersects(Flags::ERROR); + return; + } + } + } self.tasks.push_back(Entry { pipe, flags: EntryFlags::empty(), diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index c0fa0609f..ec5bfde18 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -42,7 +42,9 @@ pub(crate) struct H1Writer { impl H1Writer { pub fn new( - stream: T, buf: SharedBytes, settings: Rc>, + stream: T, + buf: SharedBytes, + settings: Rc>, ) -> H1Writer { H1Writer { flags: Flags::empty(), @@ -101,7 +103,9 @@ impl Writer for H1Writer { } fn start( - &mut self, req: &mut HttpInnerMessage, msg: &mut HttpResponse, + &mut self, + req: &mut HttpInnerMessage, + msg: &mut HttpResponse, encoding: ContentEncoding, ) -> io::Result { // prepare task @@ -138,7 +142,9 @@ impl Writer for H1Writer { let reason = msg.reason().as_bytes(); let mut is_bin = if let Body::Binary(ref bytes) = body { buffer.reserve( - 256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len() + 256 + + msg.headers().len() * AVERAGE_HEADER_SIZE + + bytes.len() + reason.len(), ); true @@ -255,9 +261,7 @@ impl Writer for H1Writer { } fn write_eof(&mut self) -> io::Result { - self.encoder.write_eof()?; - - if !self.encoder.is_eof() { + if !self.encoder.write_eof()? { Err(io::Error::new( io::ErrorKind::Other, "Last payload item, but eof is not reached", @@ -276,7 +280,9 @@ impl Writer for H1Writer { unsafe { &mut *(self.buffer.as_ref() as *const _ as *mut _) }; let written = self.write_data(buf)?; let _ = self.buffer.split_to(written); - if self.buffer.len() > self.buffer_capacity { + if shutdown && !self.buffer.is_empty() + || (self.buffer.len() > self.buffer_capacity) + { return Ok(Async::NotReady); } }