1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

handle response errors

This commit is contained in:
Nikolay Kim 2018-11-21 07:49:24 -08:00
parent ab3e12f2b4
commit 1a322966ff
5 changed files with 111 additions and 50 deletions

View File

@ -37,6 +37,37 @@ impl MessageBody for () {
} }
} }
pub enum ResponseBody<B> {
Body(B),
Other(Body),
}
impl<B: MessageBody> ResponseBody<B> {
pub fn as_ref(&self) -> Option<&B> {
if let ResponseBody::Body(ref b) = self {
Some(b)
} else {
None
}
}
}
impl<B: MessageBody> MessageBody for ResponseBody<B> {
fn length(&self) -> BodyLength {
match self {
ResponseBody::Body(ref body) => body.length(),
ResponseBody::Other(ref body) => body.length(),
}
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
match self {
ResponseBody::Body(ref mut body) => body.poll_next(),
ResponseBody::Other(ref mut body) => body.poll_next(),
}
}
}
/// Represents various types of http message body. /// Represents various types of http message body.
pub enum Body { pub enum Body {
/// Empty response. `Content-Length` header is not set. /// Empty response. `Content-Length` header is not set.
@ -332,6 +363,15 @@ mod tests {
} }
} }
impl ResponseBody<Body> {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.get_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
#[test] #[test]
fn test_static_str() { fn test_static_str() {
assert_eq!(Body::from("").length(), BodyLength::Sized(0)); assert_eq!(Body::from("").length(), BodyLength::Sized(0));

View File

@ -13,7 +13,7 @@ use tokio_timer::Delay;
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter}; use payload::{Payload, PayloadSender, PayloadStatus, PayloadWriter};
use body::{BodyLength, MessageBody}; use body::{Body, BodyLength, MessageBody, ResponseBody};
use config::ServiceConfig; use config::ServiceConfig;
use error::DispatchError; use error::DispatchError;
use request::Request; use request::Request;
@ -70,7 +70,7 @@ enum DispatcherMessage {
enum State<S: Service, B: MessageBody> { enum State<S: Service, B: MessageBody> {
None, None,
ServiceCall(S::Future), ServiceCall(S::Future),
SendPayload(B), SendPayload(ResponseBody<B>),
} }
impl<S: Service, B: MessageBody> State<S, B> { impl<S: Service, B: MessageBody> State<S, B> {
@ -186,11 +186,11 @@ where
} }
} }
fn send_response<B1: MessageBody>( fn send_response(
&mut self, &mut self,
message: Response<()>, message: Response<()>,
body: B1, body: ResponseBody<B>,
) -> Result<State<S, B1>, DispatchError<S::Error>> { ) -> Result<State<S, B>, DispatchError<S::Error>> {
self.framed self.framed
.force_send(Message::Item((message, body.length()))) .force_send(Message::Item((message, body.length())))
.map_err(|err| { .map_err(|err| {
@ -217,7 +217,7 @@ where
Some(self.handle_request(req)?) Some(self.handle_request(req)?)
} }
Some(DispatcherMessage::Error(res)) => { Some(DispatcherMessage::Error(res)) => {
self.send_response(res, ())?; self.send_response(res, ResponseBody::Other(Body::Empty))?;
None None
} }
None => None, None => None,
@ -431,7 +431,7 @@ where
trace!("Slow request timeout"); trace!("Slow request timeout");
let _ = self.send_response( let _ = self.send_response(
Response::RequestTimeout().finish().drop_body(), Response::RequestTimeout().finish().drop_body(),
(), ResponseBody::Other(Body::Empty),
); );
} else { } else {
trace!("Keep-alive connection timeout"); trace!("Keep-alive connection timeout");

View File

@ -12,7 +12,7 @@ use http::{Error as HttpError, HeaderMap, HttpTryFrom, StatusCode, Version};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
use body::{Body, BodyStream, MessageBody}; use body::{Body, BodyStream, MessageBody, ResponseBody};
use error::Error; use error::Error;
use header::{Header, IntoHeaderValue}; use header::{Header, IntoHeaderValue};
use message::{ConnectionType, Head, ResponseHead}; use message::{ConnectionType, Head, ResponseHead};
@ -21,7 +21,7 @@ use message::{ConnectionType, Head, ResponseHead};
pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536; pub(crate) const MAX_WRITE_BUFFER_SIZE: usize = 65_536;
/// An HTTP Response /// An HTTP Response
pub struct Response<B: MessageBody = Body>(Box<InnerResponse>, B); pub struct Response<B: MessageBody = Body>(Box<InnerResponse>, ResponseBody<B>);
impl Response<Body> { impl Response<Body> {
/// Create http response builder with specific status. /// Create http response builder with specific status.
@ -71,6 +71,15 @@ impl Response<Body> {
cookies: jar, cookies: jar,
} }
} }
/// Convert response to response with body
pub fn into_body<B: MessageBody>(self) -> Response<B> {
let b = match self.1 {
ResponseBody::Body(b) => b,
ResponseBody::Other(b) => b,
};
Response(self.0, ResponseBody::Other(b))
}
} }
impl<B: MessageBody> Response<B> { impl<B: MessageBody> Response<B> {
@ -195,23 +204,26 @@ impl<B: MessageBody> Response<B> {
/// Get body os this response /// Get body os this response
#[inline] #[inline]
pub fn body(&self) -> &B { pub(crate) fn body(&self) -> &ResponseBody<B> {
&self.1 &self.1
} }
/// Set a body /// Set a body
pub fn set_body<B2: MessageBody>(self, body: B2) -> Response<B2> { pub(crate) fn set_body<B2: MessageBody>(self, body: B2) -> Response<B2> {
Response(self.0, body) Response(self.0, ResponseBody::Body(body))
} }
/// Drop request's body /// Drop request's body
pub fn drop_body(self) -> Response<()> { pub(crate) fn drop_body(self) -> Response<()> {
Response(self.0, ()) Response(self.0, ResponseBody::Body(()))
} }
/// Set a body and return previous body value /// Set a body and return previous body value
pub fn replace_body<B2: MessageBody>(self, body: B2) -> (Response<B2>, B) { pub(crate) fn replace_body<B2: MessageBody>(
(Response(self.0, body), self.1) self,
body: B2,
) -> (Response<B2>, ResponseBody<B>) {
(Response(self.0, ResponseBody::Body(body)), self.1)
} }
/// Size of response in bytes, excluding HTTP headers /// Size of response in bytes, excluding HTTP headers
@ -233,7 +245,10 @@ impl<B: MessageBody> Response<B> {
} }
pub(crate) fn from_parts(parts: ResponseParts) -> Response { pub(crate) fn from_parts(parts: ResponseParts) -> Response {
Response(Box::new(InnerResponse::from_parts(parts)), Body::Empty) Response(
Box::new(InnerResponse::from_parts(parts)),
ResponseBody::Body(Body::Empty),
)
} }
} }
@ -250,7 +265,7 @@ impl<B: MessageBody> fmt::Debug for Response<B> {
for (key, val) in self.get_ref().head.headers.iter() { for (key, val) in self.get_ref().head.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); let _ = writeln!(f, " {:?}: {:?}", key, val);
} }
let _ = writeln!(f, " body: {:?}", self.body().length()); let _ = writeln!(f, " body: {:?}", self.1.length());
res res
} }
} }
@ -559,11 +574,9 @@ impl ResponseBuilder {
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn message_body<B: MessageBody>(&mut self, body: B) -> Response<B> { pub fn message_body<B: MessageBody>(&mut self, body: B) -> Response<B> {
let mut error = if let Some(e) = self.err.take() { if let Some(e) = self.err.take() {
Some(Error::from(e)) return Response::from(Error::from(e)).into_body();
} else { }
None
};
let mut response = self.response.take().expect("cannot reuse response builder"); let mut response = self.response.take().expect("cannot reuse response builder");
if let Some(ref jar) = self.cookies { if let Some(ref jar) = self.cookies {
@ -572,17 +585,12 @@ impl ResponseBuilder {
Ok(val) => { Ok(val) => {
let _ = response.head.headers.append(header::SET_COOKIE, val); let _ = response.head.headers.append(header::SET_COOKIE, val);
} }
Err(e) => if error.is_none() { Err(e) => return Response::from(Error::from(e)).into_body(),
error = Some(Error::from(e));
},
}; };
} }
} }
if let Some(error) = error {
response.error = Some(error);
}
Response(response, body) Response(response, ResponseBody::Body(body))
} }
#[inline] #[inline]
@ -812,9 +820,12 @@ impl ResponsePool {
) -> Response<B> { ) -> Response<B> {
if let Some(mut msg) = pool.0.borrow_mut().pop_front() { if let Some(mut msg) = pool.0.borrow_mut().pop_front() {
msg.head.status = status; msg.head.status = status;
Response(msg, body) Response(msg, ResponseBody::Body(body))
} else { } else {
Response(Box::new(InnerResponse::new(status, pool)), body) Response(
Box::new(InnerResponse::new(status, pool)),
ResponseBody::Body(body),
)
} }
} }
@ -971,10 +982,7 @@ mod tests {
let resp = Response::build(StatusCode::OK).json(vec!["v1", "v2", "v3"]); let resp = Response::build(StatusCode::OK).json(vec!["v1", "v2", "v3"]);
let ct = resp.headers().get(CONTENT_TYPE).unwrap(); let ct = resp.headers().get(CONTENT_TYPE).unwrap();
assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!(ct, HeaderValue::from_static("application/json"));
assert_eq!( assert_eq!(resp.body().get_ref(), b"[\"v1\",\"v2\",\"v3\"]");
*resp.body(),
Body::from(Bytes::from_static(b"[\"v1\",\"v2\",\"v3\"]"))
);
} }
#[test] #[test]
@ -984,10 +992,7 @@ mod tests {
.json(vec!["v1", "v2", "v3"]); .json(vec!["v1", "v2", "v3"]);
let ct = resp.headers().get(CONTENT_TYPE).unwrap(); let ct = resp.headers().get(CONTENT_TYPE).unwrap();
assert_eq!(ct, HeaderValue::from_static("text/json")); assert_eq!(ct, HeaderValue::from_static("text/json"));
assert_eq!( assert_eq!(resp.body().get_ref(), b"[\"v1\",\"v2\",\"v3\"]");
*resp.body(),
Body::from(Bytes::from_static(b"[\"v1\",\"v2\",\"v3\"]"))
);
} }
#[test] #[test]
@ -995,10 +1000,7 @@ mod tests {
let resp = Response::build(StatusCode::OK).json2(&vec!["v1", "v2", "v3"]); let resp = Response::build(StatusCode::OK).json2(&vec!["v1", "v2", "v3"]);
let ct = resp.headers().get(CONTENT_TYPE).unwrap(); let ct = resp.headers().get(CONTENT_TYPE).unwrap();
assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!(ct, HeaderValue::from_static("application/json"));
assert_eq!( assert_eq!(resp.body().get_ref(), b"[\"v1\",\"v2\",\"v3\"]");
*resp.body(),
Body::from(Bytes::from_static(b"[\"v1\",\"v2\",\"v3\"]"))
);
} }
#[test] #[test]
@ -1008,10 +1010,7 @@ mod tests {
.json2(&vec!["v1", "v2", "v3"]); .json2(&vec!["v1", "v2", "v3"]);
let ct = resp.headers().get(CONTENT_TYPE).unwrap(); let ct = resp.headers().get(CONTENT_TYPE).unwrap();
assert_eq!(ct, HeaderValue::from_static("text/json")); assert_eq!(ct, HeaderValue::from_static("text/json"));
assert_eq!( assert_eq!(resp.body().get_ref(), b"[\"v1\",\"v2\",\"v3\"]");
*resp.body(),
Body::from(Bytes::from_static(b"[\"v1\",\"v2\",\"v3\"]"))
);
} }
#[test] #[test]

View File

@ -6,7 +6,7 @@ use futures::future::{ok, Either, FutureResult};
use futures::{Async, Future, Poll, Sink}; use futures::{Async, Future, Poll, Sink};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use body::{BodyLength, MessageBody}; use body::{BodyLength, MessageBody, ResponseBody};
use error::{Error, ResponseError}; use error::{Error, ResponseError};
use h1::{Codec, Message}; use h1::{Codec, Message};
use response::Response; use response::Response;
@ -174,7 +174,7 @@ where
pub struct SendResponseFut<T, B> { pub struct SendResponseFut<T, B> {
res: Option<Message<(Response<()>, BodyLength)>>, res: Option<Message<(Response<()>, BodyLength)>>,
body: Option<B>, body: Option<ResponseBody<B>>,
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
} }

View File

@ -445,3 +445,25 @@ fn test_body_chunked_implicit() {
let bytes = srv.block_on(response.body()).unwrap(); let bytes = srv.block_on(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_response_http_error_handling() {
let mut srv = test::TestServer::with_factory(|| {
h1::H1Service::new(|_| {
let broken_header = Bytes::from_static(b"\0\0\0");
ok::<_, ()>(
Response::Ok()
.header(http::header::CONTENT_TYPE, broken_header)
.body(STR),
)
}).map(|_| ())
});
let req = srv.get().finish().unwrap();
let response = srv.send_request(req).unwrap();
assert_eq!(response.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
// read response
let bytes = srv.block_on(response.body()).unwrap();
assert!(bytes.is_empty());
}