1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 15:24:36 +01:00

simplify Message api

This commit is contained in:
Nikolay Kim 2019-02-07 21:16:46 -08:00
parent a7a2d4cf5c
commit b0e36fdcf9
7 changed files with 243 additions and 336 deletions

View File

@ -115,10 +115,10 @@ impl Decoder for Codec {
None => None,
})
} else if let Some((req, payload)) = self.decoder.decode(src)? {
self.flags
.set(Flags::HEAD, req.inner.head.method == Method::HEAD);
self.version = req.inner().head.version;
self.ctype = req.inner().head.connection_type();
let head = req.head();
self.flags.set(Flags::HEAD, head.method == Method::HEAD);
self.version = head.version;
self.ctype = head.connection_type();
if self.ctype == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEPALIVE_ENABLED)
{

View File

@ -159,7 +159,7 @@ pub(crate) trait MessageType: Sized {
impl MessageType for Request {
fn set_connection_type(&mut self, ctype: Option<ConnectionType>) {
self.inner_mut().head.ctype = ctype;
self.head_mut().ctype = ctype;
}
fn headers_mut(&mut self) -> &mut HeaderMap {
@ -218,12 +218,10 @@ impl MessageType for Request {
}
};
{
let inner = msg.inner_mut();
inner.head.uri = uri;
inner.head.method = method;
inner.head.version = ver;
}
let head = msg.head_mut();
head.uri = uri;
head.method = method;
head.version = ver;
Ok(Some((msg, decoder)))
}
@ -817,7 +815,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Close));
assert_eq!(req.head().ctype, Some(ConnectionType::Close));
let mut buf = BytesMut::from(
"GET /test HTTP/1.1\r\n\
@ -825,7 +823,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Close));
assert_eq!(req.head().ctype, Some(ConnectionType::Close));
}
#[test]
@ -836,7 +834,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Close));
assert_eq!(req.head().ctype, Some(ConnectionType::Close));
}
#[test]
@ -847,7 +845,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::KeepAlive));
assert_eq!(req.head().ctype, Some(ConnectionType::KeepAlive));
let mut buf = BytesMut::from(
"GET /test HTTP/1.0\r\n\
@ -855,7 +853,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::KeepAlive));
assert_eq!(req.head().ctype, Some(ConnectionType::KeepAlive));
}
#[test]
@ -866,7 +864,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, Some(ConnectionType::KeepAlive));
assert_eq!(req.head().ctype, Some(ConnectionType::KeepAlive));
}
#[test]
@ -877,7 +875,7 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.connection_type(), ConnectionType::Close);
assert_eq!(req.head().connection_type(), ConnectionType::Close);
}
#[test]
@ -888,11 +886,8 @@ mod tests {
);
let req = parse_ready!(&mut buf);
assert_eq!(req.inner().head.ctype, None);
assert_eq!(
req.inner().head.connection_type(),
ConnectionType::KeepAlive
);
assert_eq!(req.head().ctype, None);
assert_eq!(req.head().connection_type(), ConnectionType::KeepAlive);
}
#[test]
@ -905,7 +900,7 @@ mod tests {
let req = parse_ready!(&mut buf);
assert!(req.upgrade());
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Upgrade));
assert_eq!(req.head().ctype, Some(ConnectionType::Upgrade));
let mut buf = BytesMut::from(
"GET /test HTTP/1.1\r\n\
@ -915,7 +910,7 @@ mod tests {
let req = parse_ready!(&mut buf);
assert!(req.upgrade());
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Upgrade));
assert_eq!(req.head().ctype, Some(ConnectionType::Upgrade));
}
#[test]
@ -1013,7 +1008,7 @@ mod tests {
);
let mut reader = MessageDecoder::<Request>::default();
let (req, pl) = reader.decode(&mut buf).unwrap().unwrap();
assert_eq!(req.inner().head.ctype, Some(ConnectionType::Upgrade));
assert_eq!(req.head().ctype, Some(ConnectionType::Upgrade));
assert!(req.upgrade());
assert!(pl.is_unhandled());
}

View File

@ -116,7 +116,7 @@ where
let (parts, body) = req.into_parts();
let mut req = Request::with_payload(body.into());
let head = &mut req.inner_mut().head;
let head = &mut req.head_mut();
head.uri = parts.uri;
head.method = parts.method;
head.version = parts.version;

View File

@ -1,4 +1,4 @@
use std::cell::RefCell;
use std::cell::{Ref, RefCell, RefMut};
use std::collections::VecDeque;
use std::rc::Rc;
@ -146,12 +146,59 @@ impl ResponseHead {
}
pub struct Message<T: Head> {
pub head: T,
pub extensions: RefCell<Extensions>,
pub(crate) pool: &'static MessagePool<T>,
inner: Rc<MessageInner<T>>,
pool: &'static MessagePool<T>,
}
impl<T: Head> Message<T> {
/// Get new message from the pool of objects
pub fn new() -> Self {
T::pool().get_message()
}
/// Message extensions
#[inline]
pub fn extensions(&self) -> Ref<Extensions> {
self.inner.as_ref().extensions.borrow()
}
/// Mutable reference to a the message's extensions
#[inline]
pub fn extensions_mut(&self) -> RefMut<Extensions> {
self.inner.as_ref().extensions.borrow_mut()
}
}
impl<T: Head> std::ops::Deref for Message<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner.as_ref().head
}
}
impl<T: Head> std::ops::DerefMut for Message<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut Rc::get_mut(&mut self.inner)
.expect("Multiple copies exist")
.head
}
}
impl<T: Head> Drop for Message<T> {
fn drop(&mut self) {
if Rc::strong_count(&self.inner) == 1 {
self.pool.release(self.inner.clone());
}
}
}
struct MessageInner<T: Head> {
head: T,
extensions: RefCell<Extensions>,
}
impl<T: Head> MessageInner<T> {
#[inline]
/// Reset request instance
pub fn reset(&mut self) {
@ -160,10 +207,9 @@ impl<T: Head> Message<T> {
}
}
impl<T: Head> Default for Message<T> {
impl<T: Head> Default for MessageInner<T> {
fn default() -> Self {
Message {
pool: T::pool(),
MessageInner {
head: T::default(),
extensions: RefCell::new(Extensions::new()),
}
@ -172,41 +218,39 @@ impl<T: Head> Default for Message<T> {
#[doc(hidden)]
/// Request's objects pool
pub struct MessagePool<T: Head>(RefCell<VecDeque<Rc<Message<T>>>>);
pub struct MessagePool<T: Head>(RefCell<VecDeque<Rc<MessageInner<T>>>>);
thread_local!(static REQUEST_POOL: &'static MessagePool<RequestHead> = MessagePool::<RequestHead>::create());
thread_local!(static RESPONSE_POOL: &'static MessagePool<ResponseHead> = MessagePool::<ResponseHead>::create());
impl MessagePool<RequestHead> {
/// Get default request's pool
pub fn pool() -> &'static MessagePool<RequestHead> {
REQUEST_POOL.with(|p| *p)
}
/// Get Request object
#[inline]
pub fn get_message() -> Rc<Message<RequestHead>> {
REQUEST_POOL.with(|pool| {
if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
if let Some(r) = Rc::get_mut(&mut msg) {
r.reset();
}
return msg;
}
Rc::new(Message::default())
})
}
}
impl<T: Head> MessagePool<T> {
fn create() -> &'static MessagePool<T> {
let pool = MessagePool(RefCell::new(VecDeque::with_capacity(128)));
Box::leak(Box::new(pool))
}
/// Get message from the pool
#[inline]
fn get_message(&'static self) -> Message<T> {
if let Some(mut msg) = self.0.borrow_mut().pop_front() {
if let Some(r) = Rc::get_mut(&mut msg) {
r.reset();
}
Message {
inner: msg,
pool: self,
}
} else {
Message {
inner: Rc::new(MessageInner::default()),
pool: self,
}
}
}
#[inline]
/// Release request instance
pub(crate) fn release(&self, msg: Rc<Message<T>>) {
fn release(&self, msg: Rc<MessageInner<T>>) {
let v = &mut self.0.borrow_mut();
if v.len() < 128 {
v.push_front(msg);

View File

@ -1,6 +1,5 @@
use std::cell::{Ref, RefCell, RefMut};
use std::fmt;
use std::rc::Rc;
use bytes::Bytes;
use futures::Stream;
@ -9,13 +8,13 @@ use http::{header, HeaderMap, Method, Uri, Version};
use crate::error::PayloadError;
use crate::extensions::Extensions;
use crate::httpmessage::HttpMessage;
use crate::message::{Message, MessagePool, RequestHead};
use crate::message::{Message, RequestHead};
use crate::payload::Payload;
/// Request
pub struct Request<P = Payload> {
pub(crate) payload: RefCell<Option<P>>,
pub(crate) inner: Rc<Message<RequestHead>>,
pub(crate) inner: Message<RequestHead>,
}
impl<P> HttpMessage for Request<P>
@ -25,7 +24,7 @@ where
type Stream = P;
fn headers(&self) -> &HeaderMap {
&self.inner.head.headers
&self.head().headers
}
#[inline]
@ -34,12 +33,21 @@ where
}
}
impl<Payload> From<Message<RequestHead>> for Request<Payload> {
fn from(msg: Message<RequestHead>) -> Self {
Request {
payload: RefCell::new(None),
inner: msg,
}
}
}
impl Request<Payload> {
/// Create new Request instance
pub fn new() -> Request<Payload> {
Request {
payload: RefCell::new(None),
inner: MessagePool::get_message(),
inner: Message::new(),
}
}
}
@ -49,7 +57,7 @@ impl<Payload> Request<Payload> {
pub fn with_payload(payload: Payload) -> Request<Payload> {
Request {
payload: RefCell::new(Some(payload.into())),
inner: MessagePool::get_message(),
inner: Message::new(),
}
}
@ -60,123 +68,90 @@ impl<Payload> Request<Payload> {
{
Request {
payload: RefCell::new(Some(payload.into())),
inner: self.inner.clone(),
inner: self.inner,
}
}
/// Take request's payload
pub fn take_payload(mut self) -> (Option<Payload>, Request<()>) {
(
self.payload.get_mut().take(),
Request {
payload: RefCell::new(None),
inner: self.inner.clone(),
},
)
}
// /// Create new Request instance with pool
// pub(crate) fn with_pool(pool: &'static MessagePool) -> Request {
// Request {
// inner: Rc::new(Message {
// pool,
// url: Url::default(),
// head: RequestHead::default(),
// status: StatusCode::OK,
// flags: Cell::new(MessageFlags::empty()),
// payload: RefCell::new(None),
// extensions: RefCell::new(Extensions::new()),
// }),
// }
// }
#[inline]
#[doc(hidden)]
pub fn inner(&self) -> &Message<RequestHead> {
self.inner.as_ref()
}
#[inline]
#[doc(hidden)]
pub fn inner_mut(&mut self) -> &mut Message<RequestHead> {
Rc::get_mut(&mut self.inner).expect("Multiple copies exist")
/// Split request into request head and payload
pub fn into_parts(mut self) -> (Message<RequestHead>, Option<Payload>) {
(self.inner, self.payload.get_mut().take())
}
#[inline]
/// Http message part of the request
pub fn head(&self) -> &RequestHead {
&self.inner.as_ref().head
&*self.inner
}
#[inline]
#[doc(hidden)]
/// Mutable reference to a http message part of the request
pub fn head_mut(&mut self) -> &mut RequestHead {
&mut self.inner_mut().head
&mut *self.inner
}
/// Request's uri.
#[inline]
pub fn uri(&self) -> &Uri {
&self.inner().head.uri
&self.head().uri
}
/// Mutable reference to the request's uri.
#[inline]
pub fn uri_mut(&mut self) -> &mut Uri {
&mut self.inner_mut().head.uri
&mut self.head_mut().uri
}
/// Read the Request method.
#[inline]
pub fn method(&self) -> &Method {
&self.inner().head.method
&self.head().method
}
/// Read the Request Version.
#[inline]
pub fn version(&self) -> Version {
self.inner().head.version
self.head().version
}
/// The target path of this Request.
#[inline]
pub fn path(&self) -> &str {
self.inner().head.uri.path()
self.head().uri.path()
}
#[inline]
/// Returns Request's headers.
pub fn headers(&self) -> &HeaderMap {
&self.inner().head.headers
&self.head().headers
}
#[inline]
/// Returns mutable Request's headers.
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.inner_mut().head.headers
&mut self.head_mut().headers
}
/// Request extensions
#[inline]
pub fn extensions(&self) -> Ref<Extensions> {
self.inner().extensions.borrow()
self.inner.extensions()
}
/// Mutable reference to a the request's extensions
#[inline]
pub fn extensions_mut(&self) -> RefMut<Extensions> {
self.inner().extensions.borrow_mut()
self.inner.extensions_mut()
}
/// Check if request requires connection upgrade
pub fn upgrade(&self) -> bool {
if let Some(conn) = self.inner().head.headers.get(header::CONNECTION) {
if let Some(conn) = self.head().headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() {
return s.to_lowercase().contains("upgrade");
}
}
self.inner().head.method == Method::CONNECT
self.head().method == Method::CONNECT
}
// #[doc(hidden)]
@ -189,14 +164,6 @@ impl<Payload> Request<Payload> {
// }
}
impl<Payload> Drop for Request<Payload> {
fn drop(&mut self) {
if Rc::strong_count(&self.inner) == 1 {
self.inner.pool.release(self.inner.clone());
}
}
}
impl<Payload> fmt::Debug for Request<Payload> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(

View File

@ -1,7 +1,4 @@
#![allow(dead_code)]
//! Http response
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io::Write;
use std::{fmt, str};
@ -9,26 +6,27 @@ use bytes::{BufMut, Bytes, BytesMut};
use cookie::{Cookie, CookieJar};
use futures::Stream;
use http::header::{self, HeaderName, HeaderValue};
use http::{Error as HttpError, HeaderMap, HttpTryFrom, StatusCode, Version};
use http::{Error as HttpError, HeaderMap, HttpTryFrom, StatusCode};
use serde::Serialize;
use serde_json;
use crate::body::{Body, BodyStream, MessageBody, ResponseBody};
use crate::error::Error;
use crate::header::{Header, IntoHeaderValue};
use crate::message::{ConnectionType, Head, ResponseHead};
/// max write buffer size 64k
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
use crate::message::{ConnectionType, Head, Message, ResponseHead};
/// An HTTP Response
pub struct Response<B: MessageBody = Body>(Box<InnerResponse>, ResponseBody<B>);
pub struct Response<B: MessageBody = Body> {
head: Message<ResponseHead>,
body: ResponseBody<B>,
error: Option<Error>,
}
impl Response<Body> {
/// Create http response builder with specific status.
#[inline]
pub fn build(status: StatusCode) -> ResponseBuilder {
ResponsePool::get(status)
ResponseBuilder::new(status)
}
/// Create http response builder
@ -40,14 +38,21 @@ impl Response<Body> {
/// Constructs a response
#[inline]
pub fn new(status: StatusCode) -> Response {
ResponsePool::with_body(status, Body::Empty)
let mut head: Message<ResponseHead> = Message::new();
head.status = status;
Response {
head,
body: ResponseBody::Body(Body::Empty),
error: None,
}
}
/// Constructs an error response
#[inline]
pub fn from_error(error: Error) -> Response {
let mut resp = error.as_response_error().error_response();
resp.get_mut().error = Some(error);
resp.error = Some(error);
resp
}
@ -67,7 +72,7 @@ impl Response<Body> {
}
ResponseBuilder {
response: Some(self.0),
head: Some(self.head),
err: None,
cookies: jar,
}
@ -75,90 +80,85 @@ impl Response<Body> {
/// Convert response to response with body
pub fn into_body<B: MessageBody>(self) -> Response<B> {
let b = match self.1 {
let b = match self.body {
ResponseBody::Body(b) => b,
ResponseBody::Other(b) => b,
};
Response(self.0, ResponseBody::Other(b))
Response {
head: self.head,
error: self.error,
body: ResponseBody::Other(b),
}
}
}
impl<B: MessageBody> Response<B> {
#[inline]
fn get_ref(&self) -> &InnerResponse {
self.0.as_ref()
}
#[inline]
fn get_mut(&mut self) -> &mut InnerResponse {
self.0.as_mut()
}
#[inline]
/// Http message part of the response
pub fn head(&self) -> &ResponseHead {
&self.0.as_ref().head
&*self.head
}
#[inline]
/// Mutable reference to a http message part of the response
pub fn head_mut(&mut self) -> &mut ResponseHead {
&mut self.0.as_mut().head
&mut *self.head
}
/// Constructs a response with body
#[inline]
pub fn with_body(status: StatusCode, body: B) -> Response<B> {
ResponsePool::with_body(status, body)
let mut head: Message<ResponseHead> = Message::new();
head.status = status;
Response {
head,
body: ResponseBody::Body(body),
error: None,
}
}
/// The source `error` for this response
#[inline]
pub fn error(&self) -> Option<&Error> {
self.get_ref().error.as_ref()
self.error.as_ref()
}
/// Get the response status code
#[inline]
pub fn status(&self) -> StatusCode {
self.get_ref().head.status
self.head.status
}
/// Set the `StatusCode` for this response
#[inline]
pub fn status_mut(&mut self) -> &mut StatusCode {
&mut self.get_mut().head.status
&mut self.head.status
}
/// Get the headers from the response
#[inline]
pub fn headers(&self) -> &HeaderMap {
&self.get_ref().head.headers
&self.head.headers
}
/// Get a mutable reference to the headers
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.get_mut().head.headers
&mut self.head.headers
}
/// Get an iterator for the cookies set by this response
#[inline]
pub fn cookies(&self) -> CookieIter {
CookieIter {
iter: self
.get_ref()
.head
.headers
.get_all(header::SET_COOKIE)
.iter(),
iter: self.head.headers.get_all(header::SET_COOKIE).iter(),
}
}
/// Add a cookie to this response
#[inline]
pub fn add_cookie(&mut self, cookie: &Cookie) -> Result<(), HttpError> {
let h = &mut self.get_mut().head.headers;
let h = &mut self.head.headers;
HeaderValue::from_str(&cookie.to_string())
.map(|c| {
h.append(header::SET_COOKIE, c);
@ -170,7 +170,7 @@ impl<B: MessageBody> Response<B> {
/// the number of cookies removed.
#[inline]
pub fn del_cookie(&mut self, name: &str) -> usize {
let h = &mut self.get_mut().head.headers;
let h = &mut self.head.headers;
let vals: Vec<HeaderValue> = h
.get_all(header::SET_COOKIE)
.iter()
@ -196,28 +196,36 @@ impl<B: MessageBody> Response<B> {
/// Connection upgrade status
#[inline]
pub fn upgrade(&self) -> bool {
self.get_ref().head.upgrade()
self.head.upgrade()
}
/// Keep-alive status for this connection
pub fn keep_alive(&self) -> bool {
self.get_ref().head.keep_alive()
self.head.keep_alive()
}
/// Get body os this response
#[inline]
pub(crate) fn body(&self) -> &ResponseBody<B> {
&self.1
pub fn body(&self) -> &ResponseBody<B> {
&self.body
}
/// Set a body
pub(crate) fn set_body<B2: MessageBody>(self, body: B2) -> Response<B2> {
Response(self.0, ResponseBody::Body(body))
Response {
head: self.head,
body: ResponseBody::Body(body),
error: None,
}
}
/// Drop request's body
pub(crate) fn drop_body(self) -> Response<()> {
Response(self.0, ResponseBody::Body(()))
Response {
head: self.head,
body: ResponseBody::Body(()),
error: None,
}
}
/// Set a body and return previous body value
@ -225,21 +233,14 @@ impl<B: MessageBody> Response<B> {
self,
body: B2,
) -> (Response<B2>, ResponseBody<B>) {
(Response(self.0, ResponseBody::Body(body)), self.1)
}
/// Size of response in bytes, excluding HTTP headers
pub fn response_size(&self) -> u64 {
self.get_ref().response_size
}
/// Set response size
pub(crate) fn set_response_size(&mut self, size: u64) {
self.get_mut().response_size = size;
}
pub(crate) fn release(self) {
ResponsePool::release(self.0);
(
Response {
head: self.head,
body: ResponseBody::Body(body),
error: self.error,
},
self.body,
)
}
}
@ -248,15 +249,15 @@ impl<B: MessageBody> fmt::Debug for Response<B> {
let res = writeln!(
f,
"\nResponse {:?} {}{}",
self.get_ref().head.version,
self.get_ref().head.status,
self.get_ref().head.reason.unwrap_or(""),
self.head.version,
self.head.status,
self.head.reason.unwrap_or(""),
);
let _ = writeln!(f, " headers:");
for (key, val) in self.get_ref().head.headers.iter() {
for (key, val) in self.head.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
}
let _ = writeln!(f, " body: {:?}", self.1.length());
let _ = writeln!(f, " body: {:?}", self.body.length());
res
}
}
@ -284,17 +285,29 @@ impl<'a> Iterator for CookieIter<'a> {
/// This type can be used to construct an instance of `Response` through a
/// builder-like pattern.
pub struct ResponseBuilder {
response: Option<Box<InnerResponse>>,
head: Option<Message<ResponseHead>>,
err: Option<HttpError>,
cookies: Option<CookieJar>,
}
impl ResponseBuilder {
/// Create response builder
pub fn new(status: StatusCode) -> Self {
let mut head: Message<ResponseHead> = Message::new();
head.status = status;
ResponseBuilder {
head: Some(head),
err: None,
cookies: None,
}
}
/// Set HTTP status code of this response.
#[inline]
pub fn status(&mut self, status: StatusCode) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.head.status = status;
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.status = status;
}
self
}
@ -316,10 +329,10 @@ impl ResponseBuilder {
/// ```
#[doc(hidden)]
pub fn set<H: Header>(&mut self, hdr: H) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
if let Some(parts) = parts(&mut self.head, &self.err) {
match hdr.try_into() {
Ok(value) => {
parts.head.headers.append(H::name(), value);
parts.headers.append(H::name(), value);
}
Err(e) => self.err = Some(e.into()),
}
@ -346,11 +359,11 @@ impl ResponseBuilder {
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.response, &self.err) {
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
parts.head.headers.append(key, value);
parts.headers.append(key, value);
}
Err(e) => self.err = Some(e.into()),
},
@ -379,11 +392,11 @@ impl ResponseBuilder {
HeaderName: HttpTryFrom<K>,
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.response, &self.err) {
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderName::try_from(key) {
Ok(key) => match value.try_into() {
Ok(value) => {
parts.head.headers.insert(key, value);
parts.headers.insert(key, value);
}
Err(e) => self.err = Some(e.into()),
},
@ -396,8 +409,8 @@ impl ResponseBuilder {
/// Set the custom reason for the response.
#[inline]
pub fn reason(&mut self, reason: &'static str) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.head.reason = Some(reason);
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.reason = Some(reason);
}
self
}
@ -405,8 +418,8 @@ impl ResponseBuilder {
/// Set connection type to KeepAlive
#[inline]
pub fn keep_alive(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.head.set_connection_type(ConnectionType::KeepAlive);
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.set_connection_type(ConnectionType::KeepAlive);
}
self
}
@ -417,8 +430,8 @@ impl ResponseBuilder {
where
V: IntoHeaderValue,
{
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.head.set_connection_type(ConnectionType::Upgrade);
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.set_connection_type(ConnectionType::Upgrade);
}
self.set_header(header::UPGRADE, value)
}
@ -426,8 +439,8 @@ impl ResponseBuilder {
/// Force close connection, even if it is marked as keep-alive
#[inline]
pub fn force_close(&mut self) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.head.set_connection_type(ConnectionType::Close);
if let Some(parts) = parts(&mut self.head, &self.err) {
parts.set_connection_type(ConnectionType::Close);
}
self
}
@ -438,10 +451,10 @@ impl ResponseBuilder {
where
HeaderValue: HttpTryFrom<V>,
{
if let Some(parts) = parts(&mut self.response, &self.err) {
if let Some(parts) = parts(&mut self.head, &self.err) {
match HeaderValue::try_from(value) {
Ok(value) => {
parts.head.headers.insert(header::CONTENT_TYPE, value);
parts.headers.insert(header::CONTENT_TYPE, value);
}
Err(e) => self.err = Some(e.into()),
};
@ -540,20 +553,6 @@ impl ResponseBuilder {
self
}
// /// Set write buffer capacity
// ///
// /// This parameter makes sense only for streaming response
// /// or actor. If write buffer reaches specified capacity, stream or actor
// /// get paused.
// ///
// /// Default write buffer capacity is 64kb
// pub fn write_buffer_capacity(&mut self, cap: usize) -> &mut Self {
// if let Some(parts) = parts(&mut self.response, &self.err) {
// parts.write_capacity = cap;
// }
// self
// }
/// Set a body and generate `Response`.
///
/// `ResponseBuilder` can not be used after this call.
@ -569,19 +568,23 @@ impl ResponseBuilder {
return Response::from(Error::from(e)).into_body();
}
let mut response = self.response.take().expect("cannot reuse response builder");
let mut response = self.head.take().expect("cannot reuse response builder");
if let Some(ref jar) = self.cookies {
for cookie in jar.delta() {
match HeaderValue::from_str(&cookie.to_string()) {
Ok(val) => {
let _ = response.head.headers.append(header::SET_COOKIE, val);
let _ = response.headers.append(header::SET_COOKIE, val);
}
Err(e) => return Response::from(Error::from(e)).into_body(),
};
}
}
Response(response, ResponseBody::Body(body))
Response {
head: response,
body: ResponseBody::Body(body),
error: None,
}
}
#[inline]
@ -609,9 +612,8 @@ impl ResponseBuilder {
pub fn json2<T: Serialize>(&mut self, value: &T) -> Response {
match serde_json::to_string(value) {
Ok(body) => {
let contains = if let Some(parts) = parts(&mut self.response, &self.err)
{
parts.head.headers.contains_key(header::CONTENT_TYPE)
let contains = if let Some(parts) = parts(&mut self.head, &self.err) {
parts.headers.contains_key(header::CONTENT_TYPE)
} else {
true
};
@ -636,7 +638,7 @@ impl ResponseBuilder {
/// This method construct new `ResponseBuilder`
pub fn take(&mut self) -> ResponseBuilder {
ResponseBuilder {
response: self.response.take(),
head: self.head.take(),
err: self.err.take(),
cookies: self.cookies.take(),
}
@ -646,9 +648,9 @@ impl ResponseBuilder {
#[inline]
#[allow(clippy::borrowed_box)]
fn parts<'a>(
parts: &'a mut Option<Box<InnerResponse>>,
parts: &'a mut Option<Message<ResponseHead>>,
err: &Option<HttpError>,
) -> Option<&'a mut Box<InnerResponse>> {
) -> Option<&'a mut Message<ResponseHead>> {
if err.is_some() {
return None;
}
@ -719,107 +721,6 @@ impl From<BytesMut> for Response {
}
}
struct InnerResponse {
head: ResponseHead,
response_size: u64,
error: Option<Error>,
pool: &'static ResponsePool,
}
impl InnerResponse {
#[inline]
fn new(status: StatusCode, pool: &'static ResponsePool) -> InnerResponse {
InnerResponse {
head: ResponseHead {
status,
version: Version::default(),
headers: HeaderMap::with_capacity(16),
reason: None,
ctype: None,
},
pool,
response_size: 0,
error: None,
}
}
}
/// Internal use only!
pub(crate) struct ResponsePool(RefCell<VecDeque<Box<InnerResponse>>>);
thread_local!(static POOL: &'static ResponsePool = ResponsePool::pool());
impl ResponsePool {
fn pool() -> &'static ResponsePool {
let pool = ResponsePool(RefCell::new(VecDeque::with_capacity(128)));
Box::leak(Box::new(pool))
}
pub fn get_pool() -> &'static ResponsePool {
POOL.with(|p| *p)
}
#[inline]
pub fn get_builder(
pool: &'static ResponsePool,
status: StatusCode,
) -> ResponseBuilder {
if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
msg.head.status = status;
ResponseBuilder {
response: Some(msg),
err: None,
cookies: None,
}
} else {
let msg = Box::new(InnerResponse::new(status, pool));
ResponseBuilder {
response: Some(msg),
err: None,
cookies: None,
}
}
}
#[inline]
pub fn get_response<B: MessageBody>(
pool: &'static ResponsePool,
status: StatusCode,
body: B,
) -> Response<B> {
if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
msg.head.status = status;
Response(msg, ResponseBody::Body(body))
} else {
Response(
Box::new(InnerResponse::new(status, pool)),
ResponseBody::Body(body),
)
}
}
#[inline]
fn get(status: StatusCode) -> ResponseBuilder {
POOL.with(|pool| ResponsePool::get_builder(pool, status))
}
#[inline]
fn with_body<B: MessageBody>(status: StatusCode, body: B) -> Response<B> {
POOL.with(|pool| ResponsePool::get_response(pool, status, body))
}
#[inline]
fn release(mut inner: Box<InnerResponse>) {
let mut p = inner.pool.0.borrow_mut();
if p.len() < 128 {
inner.head.clear();
inner.response_size = 0;
inner.error = None;
p.push_front(inner);
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -154,11 +154,11 @@ impl TestRequest {
Request::with_payload(crate::h1::Payload::empty().into())
};
let inner = req.inner_mut();
inner.head.uri = uri;
inner.head.method = method;
inner.head.version = version;
inner.head.headers = headers;
let head = req.head_mut();
head.uri = uri;
head.method = method;
head.version = version;
head.headers = headers;
// req.set_cookies(cookies);
req