1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

simplify http response pool

This commit is contained in:
Nikolay Kim 2018-06-25 09:08:28 +06:00
parent d1b73e30e0
commit 32212bad1f
4 changed files with 77 additions and 88 deletions

View File

@ -636,7 +636,8 @@ where
}; };
} }
let (router, resources) = Router::new(&prefix, parts.settings, resources); let (router, resources) =
Router::new(&prefix, parts.settings.clone(), resources);
let inner = Rc::new(Inner { let inner = Rc::new(Inner {
prefix: prefix_len, prefix: prefix_len,

View File

@ -1,8 +1,7 @@
//! Http response //! Http response
use std::cell::UnsafeCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::Write; use std::io::Write;
use std::rc::Rc;
use std::{fmt, mem, str}; use std::{fmt, mem, str};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
@ -36,30 +35,17 @@ pub enum ConnectionType {
} }
/// An HTTP Response /// An HTTP Response
pub struct HttpResponse( pub struct HttpResponse(Box<InnerHttpResponse>);
Option<Box<InnerHttpResponse>>,
Rc<UnsafeCell<HttpResponsePool>>,
);
impl Drop for HttpResponse {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
HttpResponsePool::release(&self.1, inner)
}
}
}
impl HttpResponse { impl HttpResponse {
#[inline(always)] #[inline]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
fn get_ref(&self) -> &InnerHttpResponse { fn get_ref(&self) -> &InnerHttpResponse {
self.0.as_ref().unwrap() self.0.as_ref()
} }
#[inline(always)] #[inline]
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
fn get_mut(&mut self) -> &mut InnerHttpResponse { fn get_mut(&mut self) -> &mut InnerHttpResponse {
self.0.as_mut().unwrap() self.0.as_mut()
} }
/// Create http response builder with specific status. /// Create http response builder with specific status.
@ -96,7 +82,7 @@ impl HttpResponse {
/// Convert `HttpResponse` to a `HttpResponseBuilder` /// Convert `HttpResponse` to a `HttpResponseBuilder`
#[inline] #[inline]
pub fn into_builder(mut self) -> HttpResponseBuilder { pub fn into_builder(self) -> HttpResponseBuilder {
// If this response has cookies, load them into a jar // If this response has cookies, load them into a jar
let mut jar: Option<CookieJar> = None; let mut jar: Option<CookieJar> = None;
for c in self.cookies() { for c in self.cookies() {
@ -109,11 +95,8 @@ impl HttpResponse {
} }
} }
let response = self.0.take();
let pool = Some(Rc::clone(&self.1));
HttpResponseBuilder { HttpResponseBuilder {
response, response: Some(self.0),
pool,
err: None, err: None,
cookies: jar, cookies: jar,
} }
@ -299,15 +282,12 @@ impl HttpResponse {
self.get_mut().write_capacity = cap; self.get_mut().write_capacity = cap;
} }
pub(crate) fn into_parts(mut self) -> HttpResponseParts { pub(crate) fn into_parts(self) -> HttpResponseParts {
self.0.take().unwrap().into_parts() self.0.into_parts()
} }
pub(crate) fn from_parts(parts: HttpResponseParts) -> HttpResponse { pub(crate) fn from_parts(parts: HttpResponseParts) -> HttpResponse {
HttpResponse( HttpResponse(Box::new(InnerHttpResponse::from_parts(parts)))
Some(Box::new(InnerHttpResponse::from_parts(parts))),
HttpResponsePool::pool(),
)
} }
} }
@ -353,7 +333,6 @@ impl<'a> Iterator for CookieIter<'a> {
/// builder-like pattern. /// builder-like pattern.
pub struct HttpResponseBuilder { pub struct HttpResponseBuilder {
response: Option<Box<InnerHttpResponse>>, response: Option<Box<InnerHttpResponse>>,
pool: Option<Rc<UnsafeCell<HttpResponsePool>>>,
err: Option<HttpError>, err: Option<HttpError>,
cookies: Option<CookieJar>, cookies: Option<CookieJar>,
} }
@ -643,7 +622,7 @@ impl HttpResponseBuilder {
} }
} }
response.body = body.into(); response.body = body.into();
HttpResponse(Some(response), self.pool.take().unwrap()) HttpResponse(response)
} }
#[inline] #[inline]
@ -692,7 +671,6 @@ impl HttpResponseBuilder {
pub fn take(&mut self) -> HttpResponseBuilder { pub fn take(&mut self) -> HttpResponseBuilder {
HttpResponseBuilder { HttpResponseBuilder {
response: self.response.take(), response: self.response.take(),
pool: self.pool.take(),
err: self.err.take(), err: self.err.take(),
cookies: self.cookies.take(), cookies: self.cookies.take(),
} }
@ -973,27 +951,28 @@ impl InnerHttpResponse {
} }
/// Internal use only! /// Internal use only!
pub(crate) struct HttpResponsePool(VecDeque<Box<InnerHttpResponse>>); pub(crate) struct HttpResponsePool(RefCell<VecDeque<Box<InnerHttpResponse>>>);
thread_local!(static POOL: Rc<UnsafeCell<HttpResponsePool>> = HttpResponsePool::pool()); thread_local!(static POOL: &'static HttpResponsePool = HttpResponsePool::pool());
impl HttpResponsePool { impl HttpResponsePool {
pub fn pool() -> Rc<UnsafeCell<HttpResponsePool>> { fn pool() -> &'static HttpResponsePool {
Rc::new(UnsafeCell::new(HttpResponsePool(VecDeque::with_capacity( let pool = HttpResponsePool(RefCell::new(VecDeque::with_capacity(128)));
128, Box::leak(Box::new(pool))
)))) }
pub fn get_pool() -> &'static HttpResponsePool {
POOL.with(|p| *p)
} }
#[inline] #[inline]
pub fn get_builder( pub fn get_builder(
pool: &Rc<UnsafeCell<HttpResponsePool>>, status: StatusCode, pool: &'static HttpResponsePool, status: StatusCode,
) -> HttpResponseBuilder { ) -> HttpResponseBuilder {
let p = unsafe { &mut *pool.as_ref().get() }; if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
if let Some(mut msg) = p.0.pop_front() {
msg.status = status; msg.status = status;
HttpResponseBuilder { HttpResponseBuilder {
response: Some(msg), response: Some(msg),
pool: Some(Rc::clone(pool)),
err: None, err: None,
cookies: None, cookies: None,
} }
@ -1001,7 +980,6 @@ impl HttpResponsePool {
let msg = Box::new(InnerHttpResponse::new(status, Body::Empty)); let msg = Box::new(InnerHttpResponse::new(status, Body::Empty));
HttpResponseBuilder { HttpResponseBuilder {
response: Some(msg), response: Some(msg),
pool: Some(Rc::clone(pool)),
err: None, err: None,
cookies: None, cookies: None,
} }
@ -1010,16 +988,15 @@ impl HttpResponsePool {
#[inline] #[inline]
pub fn get_response( pub fn get_response(
pool: &Rc<UnsafeCell<HttpResponsePool>>, status: StatusCode, body: Body, pool: &'static HttpResponsePool, status: StatusCode, body: Body,
) -> HttpResponse { ) -> HttpResponse {
let p = unsafe { &mut *pool.as_ref().get() }; if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
if let Some(mut msg) = p.0.pop_front() {
msg.status = status; msg.status = status;
msg.body = body; msg.body = body;
HttpResponse(Some(msg), Rc::clone(pool)) HttpResponse(msg)
} else { } else {
let msg = Box::new(InnerHttpResponse::new(status, body)); let msg = Box::new(InnerHttpResponse::new(status, body));
HttpResponse(Some(msg), Rc::clone(pool)) HttpResponse(msg)
} }
} }
@ -1033,13 +1010,12 @@ impl HttpResponsePool {
POOL.with(|pool| HttpResponsePool::get_response(pool, status, body)) POOL.with(|pool| HttpResponsePool::get_response(pool, status, body))
} }
#[inline(always)] #[inline]
#[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))] pub(crate) fn release(resp: HttpResponse) {
fn release( let mut inner = resp.0;
pool: &Rc<UnsafeCell<HttpResponsePool>>, mut inner: Box<InnerHttpResponse>, POOL.with(|pool| {
) { let mut p = pool.0.borrow_mut();
let pool = unsafe { &mut *pool.as_ref().get() }; if p.len() < 128 {
if pool.0.len() < 128 {
inner.headers.clear(); inner.headers.clear();
inner.version = None; inner.version = None;
inner.chunked = None; inner.chunked = None;
@ -1049,8 +1025,9 @@ impl HttpResponsePool {
inner.response_size = 0; inner.response_size = 0;
inner.error = None; inner.error = None;
inner.write_capacity = MAX_WRITE_BUFFER_SIZE; inner.write_capacity = MAX_WRITE_BUFFER_SIZE;
pool.0.push_front(inner); p.push_front(inner);
} }
});
} }
} }

View File

@ -13,7 +13,7 @@ use error::Error;
use handler::{AsyncResult, AsyncResultItem}; use handler::{AsyncResult, AsyncResultItem};
use header::ContentEncoding; use header::ContentEncoding;
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::HttpResponse; use httpresponse::{HttpResponse, HttpResponsePool};
use middleware::{Finished, Middleware, Response, Started}; use middleware::{Finished, Middleware, Response, Started};
use server::{HttpHandlerTask, Writer, WriterState}; use server::{HttpHandlerTask, Writer, WriterState};
@ -691,7 +691,7 @@ impl<S: 'static, H> ProcessResponse<S, H> {
/// Middlewares start executor /// Middlewares start executor
struct FinishingMiddlewares<S, H> { struct FinishingMiddlewares<S, H> {
resp: HttpResponse, resp: Option<HttpResponse>,
fut: Option<Box<Future<Item = (), Error = Error>>>, fut: Option<Box<Future<Item = (), Error = Error>>>,
_s: PhantomData<S>, _s: PhantomData<S>,
_h: PhantomData<H>, _h: PhantomData<H>,
@ -703,10 +703,10 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], resp: HttpResponse, info: &mut PipelineInfo<S>, mws: &[Box<Middleware<S>>], resp: HttpResponse,
) -> PipelineState<S, H> { ) -> PipelineState<S, H> {
if info.count == 0 { if info.count == 0 {
Completed::init(info) Completed::init(info, resp)
} else { } else {
let mut state = FinishingMiddlewares { let mut state = FinishingMiddlewares {
resp, resp: Some(resp),
fut: None, fut: None,
_s: PhantomData, _s: PhantomData,
_h: PhantomData, _h: PhantomData,
@ -741,15 +741,16 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
self.fut = None; self.fut = None;
if info.count == 0 { if info.count == 0 {
return Some(Completed::init(info)); return Some(Completed::init(info, self.resp.take().unwrap()));
} }
info.count -= 1; info.count -= 1;
let state = mws[info.count as usize].finish(&mut info.req, &self.resp); let state = mws[info.count as usize]
.finish(&mut info.req, self.resp.as_ref().unwrap());
match state { match state {
Finished::Done => { Finished::Done => {
if info.count == 0 { if info.count == 0 {
return Some(Completed::init(info)); return Some(Completed::init(info, self.resp.take().unwrap()));
} }
} }
Finished::Future(fut) => { Finished::Future(fut) => {
@ -761,19 +762,20 @@ impl<S: 'static, H> FinishingMiddlewares<S, H> {
} }
#[derive(Debug)] #[derive(Debug)]
struct Completed<S, H>(PhantomData<S>, PhantomData<H>); struct Completed<S, H>(PhantomData<S>, PhantomData<H>, Option<HttpResponse>);
impl<S, H> Completed<S, H> { impl<S, H> Completed<S, H> {
#[inline] #[inline]
fn init(info: &mut PipelineInfo<S>) -> PipelineState<S, H> { fn init(info: &mut PipelineInfo<S>, resp: HttpResponse) -> PipelineState<S, H> {
if let Some(ref err) = info.error { if let Some(ref err) = info.error {
error!("Error occurred during request handling: {}", err); error!("Error occurred during request handling: {}", err);
} }
if info.context.is_none() { if info.context.is_none() {
HttpResponsePool::release(resp);
PipelineState::None PipelineState::None
} else { } else {
PipelineState::Completed(Completed(PhantomData, PhantomData)) PipelineState::Completed(Completed(PhantomData, PhantomData, Some(resp)))
} }
} }
@ -781,8 +783,14 @@ impl<S, H> Completed<S, H> {
fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> { fn poll(&mut self, info: &mut PipelineInfo<S>) -> Option<PipelineState<S, H>> {
match info.poll_context() { match info.poll_context() {
Ok(Async::NotReady) => None, Ok(Async::NotReady) => None,
Ok(Async::Ready(())) => Some(PipelineState::None), Ok(Async::Ready(())) => {
Err(_) => Some(PipelineState::Error), HttpResponsePool::release(self.2.take().unwrap());
Some(PipelineState::None)
}
Err(_) => {
HttpResponsePool::release(self.2.take().unwrap());
Some(PipelineState::Error)
}
} }
} }
} }
@ -793,6 +801,7 @@ mod tests {
use actix::*; use actix::*;
use context::HttpContext; use context::HttpContext;
use futures::future::{lazy, result}; use futures::future::{lazy, result};
use http::StatusCode;
use tokio::runtime::current_thread::Runtime; use tokio::runtime::current_thread::Runtime;
impl<S, H> PipelineState<S, H> { impl<S, H> PipelineState<S, H> {
@ -823,16 +832,18 @@ mod tests {
.unwrap() .unwrap()
.block_on(lazy(|| { .block_on(lazy(|| {
let mut info = PipelineInfo::new(HttpRequest::default()); let mut info = PipelineInfo::new(HttpRequest::default());
Completed::<(), Inner<()>>::init(&mut info) let resp = HttpResponse::new(StatusCode::OK);
Completed::<(), Inner<()>>::init(&mut info, resp)
.is_none() .is_none()
.unwrap(); .unwrap();
let req = HttpRequest::default(); let req = HttpRequest::default();
let ctx = HttpContext::new(req.clone(), MyActor); let ctx = HttpContext::new(req.clone(), MyActor);
let addr = ctx.address(); let addr = ctx.address();
let resp = HttpResponse::new(StatusCode::OK);
let mut info = PipelineInfo::new(req); let mut info = PipelineInfo::new(req);
info.context = Some(Box::new(ctx)); info.context = Some(Box::new(ctx));
let mut state = Completed::<(), Inner<()>>::init(&mut info) let mut state = Completed::<(), Inner<()>>::init(&mut info, resp)
.completed() .completed()
.unwrap(); .unwrap();

View File

@ -42,7 +42,7 @@ pub struct ServerSettings {
secure: bool, secure: bool,
host: String, host: String,
cpu_pool: UnsafeCell<Option<CpuPool>>, cpu_pool: UnsafeCell<Option<CpuPool>>,
responses: Rc<UnsafeCell<HttpResponsePool>>, responses: &'static HttpResponsePool,
} }
impl Clone for ServerSettings { impl Clone for ServerSettings {
@ -52,7 +52,7 @@ impl Clone for ServerSettings {
secure: self.secure, secure: self.secure,
host: self.host.clone(), host: self.host.clone(),
cpu_pool: UnsafeCell::new(None), cpu_pool: UnsafeCell::new(None),
responses: HttpResponsePool::pool(), responses: HttpResponsePool::get_pool(),
} }
} }
} }
@ -63,7 +63,7 @@ impl Default for ServerSettings {
addr: None, addr: None,
secure: false, secure: false,
host: "localhost:8080".to_owned(), host: "localhost:8080".to_owned(),
responses: HttpResponsePool::pool(), responses: HttpResponsePool::get_pool(),
cpu_pool: UnsafeCell::new(None), cpu_pool: UnsafeCell::new(None),
} }
} }
@ -82,7 +82,7 @@ impl ServerSettings {
"localhost".to_owned() "localhost".to_owned()
}; };
let cpu_pool = UnsafeCell::new(None); let cpu_pool = UnsafeCell::new(None);
let responses = HttpResponsePool::pool(); let responses = HttpResponsePool::get_pool();
ServerSettings { ServerSettings {
addr, addr,
secure, secure,
@ -103,7 +103,7 @@ impl ServerSettings {
host, host,
secure, secure,
cpu_pool: UnsafeCell::new(None), cpu_pool: UnsafeCell::new(None),
responses: HttpResponsePool::pool(), responses: HttpResponsePool::get_pool(),
} }
} }