diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3aac6efa8..585b3f497 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,7 +86,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: -v --workspace --all-features --no-fail-fast -- --nocapture + args: --workspace --all-features --no-fail-fast -- --nocapture --skip=test_h2_content_length --skip=test_reading_deflate_encoding_large_random_rustls diff --git a/CHANGES.md b/CHANGES.md index ead69f293..162f9f61b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,9 +5,17 @@ * `HttpServer::worker_max_blocking_threads` for setting block thread pool. [#2200] ### Changed +* `ServiceResponse::error_response` now uses body type of `Body`. [#2201] +* `ServiceResponse::checked_expr` now returns a `Result`. [#2201] * Update `language-tags` to `0.3`. +* `ServiceResponse::take_body`. [#2201] +* `ServiceResponse::map_body` closure receives and returns `B` instead of `ResponseBody` types. [#2201] + +### Removed +* `HttpResponse::take_body` and old `HttpResponse::into_body` method that casted body type. [#2201] [#2200]: https://github.com/actix/actix-web/pull/2200 +[#2201]: https://github.com/actix/actix-web/pull/2201 ## 4.0.0-beta.6 - 2021-04-17 diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index dc51ada18..31e1434bd 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -96,8 +96,7 @@ impl Service for FilesService { return Box::pin(ok(req.into_response( HttpResponse::Found() .insert_header((header::LOCATION, redirect_to)) - .body("") - .into_body(), + .finish(), ))); } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index f398b1c92..29bc74fe3 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -6,21 +6,29 @@ * Re-export `http` crate's `Error` type as `error::HttpError`. [#2171] * Re-export `StatusCode`, `Method`, `Version` and `Uri` at the crate root. [#2171] * Re-export `ContentEncoding` and `ConnectionType` at the crate root. [#2171] +* `Response::into_body` that consumes response and returns body type. [#2201] +* `impl Default` for `Response`. [#2201] ### Changed * The `MessageBody` trait now has an associated `Error` type. [#2183] +* Places in `Response` where `ResponseBody` was received or returned now simply use `B`. [#2201] * `header` mod is now public. [#2171] * `uri` mod is now public. [#2171] * Update `language-tags` to `0.3`. -* Reduce the level from `error` to `debug` for the log line that is emitted when a `500 Internal Server Error` is built using `HttpResponse::from_error`. [#2196] +* Reduce the level from `error` to `debug` for the log line that is emitted when a `500 Internal Server Error` is built using `HttpResponse::from_error`. [#2201] +* `ResponseBuilder::message_body` now returns a `Result`. [#2201] ### Removed * Stop re-exporting `http` crate's `HeaderMap` types in addition to ours. [#2171] * Down-casting for `MessageBody` types. [#2183] +* `error::Result` alias. [#2201] +* `impl Future` for `Response`. [#2201] +* `Response::take_body` and old `Response::into_body` method that casted body type. [#2201] [#2171]: https://github.com/actix/actix-web/pull/2171 [#2183]: https://github.com/actix/actix-web/pull/2183 [#2196]: https://github.com/actix/actix-web/pull/2196 +[#2201]: https://github.com/actix/actix-web/pull/2201 ## 3.0.0-beta.6 - 2021-04-17 diff --git a/actix-http/examples/ws.rs b/actix-http/examples/ws.rs index 1c6f7474c..d3cedf870 100644 --- a/actix-http/examples/ws.rs +++ b/actix-http/examples/ws.rs @@ -40,7 +40,7 @@ async fn handler(req: Request) -> Result>, Error> // handshake will always fail under HTTP/2 log::info!("responding"); - Ok(res.message_body(BodyStream::new(Heartbeat::new(ws::Codec::new())))) + Ok(res.message_body(BodyStream::new(Heartbeat::new(ws::Codec::new())))?) } struct Heartbeat { diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index d21cca60b..cdfcd226b 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -86,15 +86,6 @@ mod tests { } } - impl ResponseBody { - pub(crate) fn get_ref(&self) -> &[u8] { - match *self { - ResponseBody::Body(ref b) => b.get_ref(), - ResponseBody::Other(ref b) => b.get_ref(), - } - } - } - #[actix_rt::test] async fn test_static_str() { assert_eq!(Body::from("").size(), BodySize::Sized(0)); diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index c92f9076d..20b2a2d75 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -18,12 +18,6 @@ use crate::{body::Body, helpers::Writer, Response, ResponseBuilder}; pub use http::Error as HttpError; -/// A specialized [`std::result::Result`] for Actix Web operations. -/// -/// This typedef is generally used to avoid writing out `actix_http::error::Error` directly and is -/// otherwise a direct mapping to `Result`. -pub type Result = std::result::Result; - /// General purpose actix web error. /// /// An actix web error is used to carry errors from `std::error` @@ -470,9 +464,8 @@ impl ResponseError for ContentTypeError { /// /// ``` /// # use std::io; -/// # use actix_http::*; -/// -/// fn index(req: Request) -> Result<&'static str> { +/// # use actix_http::{error, Request}; +/// fn index(req: Request) -> Result<&'static str, actix_http::Error> { /// Err(error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "error"))) /// } /// ``` diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 7ab89ba87..574f0b2a9 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -17,7 +17,7 @@ use futures_core::ready; use log::{error, trace}; use pin_project::pin_project; -use crate::body::{Body, BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::error::{ParseError, PayloadError}; @@ -141,7 +141,8 @@ where None, ExpectCall(#[pin] X::Future), ServiceCall(#[pin] S::Future), - SendPayload(#[pin] ResponseBody), + SendPayload(#[pin] B), + SendErrorPayload(#[pin] Body), } impl State @@ -295,11 +296,11 @@ where io.poll_flush(cx) } - fn send_response( + fn send_response_inner( self: Pin<&mut Self>, message: Response<()>, - body: ResponseBody, - ) -> Result<(), DispatchError> { + body: &impl MessageBody, + ) -> Result { let size = body.size(); let mut this = self.project(); this.codec @@ -312,10 +313,35 @@ where })?; this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); - match size { - BodySize::None | BodySize::Empty => this.state.set(State::None), - _ => this.state.set(State::SendPayload(body)), + + Ok(size) + } + + fn send_response( + mut self: Pin<&mut Self>, + message: Response<()>, + body: B, + ) -> Result<(), DispatchError> { + let size = self.as_mut().send_response_inner(message, &body)?; + let state = match size { + BodySize::None | BodySize::Empty => State::None, + _ => State::SendPayload(body), }; + self.project().state.set(state); + Ok(()) + } + + fn send_error_response( + mut self: Pin<&mut Self>, + message: Response<()>, + body: Body, + ) -> Result<(), DispatchError> { + let size = self.as_mut().send_response_inner(message, &body)?; + let state = match size { + BodySize::None | BodySize::Empty => State::None, + _ => State::SendErrorPayload(body), + }; + self.project().state.set(state); Ok(()) } @@ -353,8 +379,7 @@ where // send_response would update InnerDispatcher state to SendPayload or // None(If response body is empty). // continue loop to poll it. - self.as_mut() - .send_response(res, ResponseBody::Other(Body::Empty))?; + self.as_mut().send_error_response(res, Body::Empty)?; } // return with upgrade request and poll it exclusively. @@ -376,7 +401,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_error_response(res, body)?; } // service call pending and could be waiting for more chunk messages. @@ -392,6 +417,41 @@ where }, StateProj::SendPayload(mut stream) => { + // keep populate writer buffer until buffer size limit hit, + // get blocked or finished. + while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => { + this.codec.encode( + Message::Chunk(Some(item)), + &mut this.write_buf, + )?; + } + + Poll::Ready(None) => { + this.codec + .encode(Message::Chunk(None), &mut this.write_buf)?; + // payload stream finished. + // set state to None and handle next message + this.state.set(State::None); + continue 'res; + } + + Poll::Ready(Some(Err(err))) => { + return Err(DispatchError::Service(err.into())) + } + + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } + // buffer is beyond max size. + // return and try to write the whole buffer to io stream. + return Ok(PollResponse::DrainWriteBuf); + } + + StateProj::SendErrorPayload(mut stream) => { + // TODO: de-dupe impl with SendPayload + // keep populate writer buffer until buffer size limit hit, // get blocked or finished. while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { @@ -433,12 +493,14 @@ where let fut = this.flow.service.call(req); this.state.set(State::ServiceCall(fut)); } + // send expect error as response Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_error_response(res, body)?; } + // expect must be solved before progress can be made. Poll::Pending => return Ok(PollResponse::DoNothing), }, @@ -486,7 +548,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - return self.send_response(res, body.into_body()); + return self.send_error_response(res, body); } } } @@ -506,7 +568,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.send_response(res, body.into_body()) + self.send_error_response(res, body) } }; } @@ -626,8 +688,10 @@ where } // Requests overflow buffer size should be responded with 431 this.messages.push_back(DispatcherMessage::Error( - Response::new(StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE) - .drop_body(), + Response::with_body( + StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, + (), + ), )); this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(ParseError::TooLarge.into()); @@ -706,10 +770,9 @@ where } else { // timeout on first request (slow request) return 408 trace!("Slow request timeout"); - let _ = self.as_mut().send_response( - Response::new(StatusCode::REQUEST_TIMEOUT) - .drop_body(), - ResponseBody::Other(Body::Empty), + let _ = self.as_mut().send_error_response( + Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), + Body::Empty, ); this = self.project(); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 4e9903284..eaabcb687 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -630,8 +630,7 @@ mod tests { async fn test_no_content_length() { let mut bytes = BytesMut::with_capacity(2048); - let mut res: Response<()> = - Response::new(StatusCode::SWITCHING_PROTOCOLS).into_body::<()>(); + let mut res = Response::with_body(StatusCode::SWITCHING_PROTOCOLS, ()); res.headers_mut().insert(DATE, HeaderValue::from_static("")); res.headers_mut() .insert(CONTENT_LENGTH, HeaderValue::from_static("0")); diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 73f01e913..90e44daa4 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{BodySize, MessageBody}; use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; @@ -14,7 +14,7 @@ use crate::response::Response; pub struct SendResponse { res: Option, BodySize)>>, #[pin] - body: Option>, + body: Option, #[pin] framed: Option>, } @@ -62,7 +62,18 @@ where .unwrap() .is_write_buf_full() { - match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { + let next = + // TODO: MSRV 1.51: poll_map_err + match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)), + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Err(err.into())) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }; + + match next { Poll::Ready(item) => { // body is done when item is None body_done = item.is_none(); diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 07636470b..5be172aaf 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -12,7 +12,7 @@ use h2::{ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::message::ResponseHead; @@ -135,7 +135,8 @@ struct ServiceResponse { #[pin_project::pin_project(project = ServiceResponseStateProj)] enum ServiceResponseState { ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, #[pin] ResponseBody), + SendPayload(SendStream, #[pin] B), + SendErrorPayload(SendStream, #[pin] Body), } impl ServiceResponse @@ -280,9 +281,8 @@ where if size.is_eof() { Poll::Ready(()) } else { - this.state.set(ServiceResponseState::SendPayload( - stream, - body.into_body(), + this.state.set(ServiceResponseState::SendErrorPayload( + stream, body, )); self.poll(cx) } @@ -331,8 +331,65 @@ where *this.buffer = Some(chunk); } + Some(Err(err)) => { + error!( + "Response payload stream error: {:?}", + err.into() + ); + + return Poll::Ready(()); + } + }, + } + } + } + + ServiceResponseStateProj::SendErrorPayload(ref mut stream, ref mut body) => { + // TODO: de-dupe impl with SendPayload + + loop { + match this.buffer { + Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { + None => return Poll::Ready(()), + + Some(Ok(cap)) => { + let len = buffer.len(); + let bytes = buffer.split_to(cmp::min(cap, len)); + + if let Err(e) = stream.send_data(bytes, false) { + warn!("{:?}", e); + return Poll::Ready(()); + } else if !buffer.is_empty() { + let cap = cmp::min(buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + this.buffer.take(); + } + } + Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); + warn!("{:?}", e); + return Poll::Ready(()); + } + }, + + None => match ready!(body.as_mut().poll_next(cx)) { + None => { + if let Err(e) = stream.send_data(Bytes::new(), true) { + warn!("{:?}", e); + } + return Poll::Ready(()); + } + + Some(Ok(chunk)) => { + stream + .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); + *this.buffer = Some(chunk); + } + + Some(Err(err)) => { + error!("Response payload stream error: {:?}", err); + return Poll::Ready(()); } }, diff --git a/actix-http/src/header/shared/charset.rs b/actix-http/src/header/shared/charset.rs index 36bdbf7e2..b482f6bce 100644 --- a/actix-http/src/header/shared/charset.rs +++ b/actix-http/src/header/shared/charset.rs @@ -104,7 +104,7 @@ impl Display for Charset { impl FromStr for Charset { type Err = crate::Error; - fn from_str(s: &str) -> crate::Result { + fn from_str(s: &str) -> Result { Ok(match s.to_ascii_uppercase().as_ref() { "US-ASCII" => Us_Ascii, "ISO-8859-1" => Iso_8859_1, diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 82d0415c2..7c2c3b4e3 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -54,7 +54,7 @@ pub mod ws; pub use self::builder::HttpServiceBuilder; pub use self::config::{KeepAlive, ServiceConfig}; -pub use self::error::{Error, ResponseError, Result}; +pub use self::error::{Error, ResponseError}; pub use self::extensions::Extensions; pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 8cb99d43a..0a3f3a915 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -293,14 +293,14 @@ impl ResponseHead { } } - #[inline] /// Check if keep-alive is enabled + #[inline] pub fn keep_alive(&self) -> bool { self.connection_type() == ConnectionType::KeepAlive } - #[inline] /// Check upgrade status of this message + #[inline] pub fn upgrade(&self) -> bool { self.connection_type() == ConnectionType::Upgrade } @@ -389,12 +389,6 @@ impl BoxedResponseHead { pub fn new(status: StatusCode) -> Self { RESPONSE_POOL.with(|p| p.get_message(status)) } - - pub(crate) fn take(&mut self) -> Self { - BoxedResponseHead { - head: self.head.take(), - } - } } impl std::ops::Deref for BoxedResponseHead { diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index da5c7e000..1b3f68505 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -2,17 +2,13 @@ use std::{ cell::{Ref, RefMut}, - fmt, - future::Future, - pin::Pin, - str, - task::{Context, Poll}, + fmt, str, }; use bytes::{Bytes, BytesMut}; use crate::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, error::Error, extensions::Extensions, http::{HeaderMap, StatusCode}, @@ -23,22 +19,22 @@ use crate::{ /// An HTTP response. pub struct Response { pub(crate) head: BoxedResponseHead, - pub(crate) body: ResponseBody, + pub(crate) body: B, pub(crate) error: Option, } impl Response { - /// Constructs a response + /// Constructs a new response with default body. #[inline] pub fn new(status: StatusCode) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(Body::Empty), + body: Body::Empty, error: None, } } - /// Create HTTP response builder with specific status. + /// Constructs a new response builder. #[inline] pub fn build(status: StatusCode) -> ResponseBuilder { ResponseBuilder::new(status) @@ -47,25 +43,25 @@ impl Response { // just a couple frequently used shortcuts // this list should not grow larger than a few - /// Creates a new response with status 200 OK. + /// Constructs a new response with status 200 OK. #[inline] pub fn ok() -> Response { Response::new(StatusCode::OK) } - /// Creates a new response with status 400 Bad Request. + /// Constructs a new response with status 400 Bad Request. #[inline] pub fn bad_request() -> Response { Response::new(StatusCode::BAD_REQUEST) } - /// Creates a new response with status 404 Not Found. + /// Constructs a new response with status 404 Not Found. #[inline] pub fn not_found() -> Response { Response::new(StatusCode::NOT_FOUND) } - /// Creates a new response with status 500 Internal Server Error. + /// Constructs a new response with status 500 Internal Server Error. #[inline] pub fn internal_server_error() -> Response { Response::new(StatusCode::INTERNAL_SERVER_ERROR) @@ -73,7 +69,7 @@ impl Response { // end shortcuts - /// Constructs an error response + /// Constructs a new response from an error. #[inline] pub fn from_error(error: Error) -> Response { let mut resp = error.as_response_error().error_response(); @@ -83,162 +79,142 @@ impl Response { resp.error = Some(error); resp } - - /// Convert response to response with body - pub fn into_body(self) -> Response { - let b = match self.body { - ResponseBody::Body(b) => b, - ResponseBody::Other(b) => b, - }; - Response { - head: self.head, - error: self.error, - body: ResponseBody::Other(b), - } - } } impl Response { - /// Constructs a response with body + /// Constructs a new response with given body. #[inline] pub fn with_body(status: StatusCode, body: B) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(body), + body: body, error: None, } } + /// Returns a reference to the head of this response. #[inline] - /// Http message part of the response pub fn head(&self) -> &ResponseHead { &*self.head } + /// Returns a mutable reference to the head of this response. #[inline] - /// Mutable reference to a HTTP message part of the response pub fn head_mut(&mut self) -> &mut ResponseHead { &mut *self.head } - /// The source `error` for this response + /// Returns the source `error` for this response, if one is set. #[inline] pub fn error(&self) -> Option<&Error> { self.error.as_ref() } - /// Get the response status code + /// Returns the status code of this response. #[inline] pub fn status(&self) -> StatusCode { self.head.status } - /// Set the `StatusCode` for this response + /// Returns a mutable reference the status code of this response. #[inline] pub fn status_mut(&mut self) -> &mut StatusCode { &mut self.head.status } - /// Get the headers from the response + /// Returns a reference to response headers. #[inline] pub fn headers(&self) -> &HeaderMap { &self.head.headers } - /// Get a mutable reference to the headers + /// Returns a mutable reference to response headers. #[inline] pub fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.head.headers } - /// Connection upgrade status + /// Returns true if connection upgrade is enabled. #[inline] pub fn upgrade(&self) -> bool { self.head.upgrade() } - /// Keep-alive status for this connection + /// Returns true if keep-alive is enabled. pub fn keep_alive(&self) -> bool { self.head.keep_alive() } - /// Responses extensions + /// Returns a reference to the extensions of this response. #[inline] pub fn extensions(&self) -> Ref<'_, Extensions> { self.head.extensions.borrow() } - /// Mutable reference to a the response's extensions + /// Returns a mutable reference to the extensions of this response. #[inline] pub fn extensions_mut(&mut self) -> RefMut<'_, Extensions> { self.head.extensions.borrow_mut() } - /// Get body of this response + /// Returns a reference to the body of this response. #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { &self.body } - /// Set a body + /// Sets new body. pub fn set_body(self, body: B2) -> Response { Response { head: self.head, - body: ResponseBody::Body(body), + body, error: None, } } - /// Split response and body - pub fn into_parts(self) -> (Response<()>, ResponseBody) { - ( - Response { - head: self.head, - body: ResponseBody::Body(()), - error: self.error, - }, - self.body, - ) - } - - /// Drop request's body + /// Drops body and returns new response. pub fn drop_body(self) -> Response<()> { - Response { - head: self.head, - body: ResponseBody::Body(()), - error: None, - } + self.set_body(()) } - /// Set a body and return previous body value - pub(crate) fn replace_body(self, body: B2) -> (Response, ResponseBody) { + /// Sets new body, returning new response and previous body value. + pub(crate) fn replace_body(self, body: B2) -> (Response, B) { ( Response { head: self.head, - body: ResponseBody::Body(body), + body, error: self.error, }, self.body, ) } - /// Set a body and return previous body value + /// Returns split head and body. + /// + /// # Implementation Notes + /// Due to internal performance optimisations, the first element of the returned tuple is a + /// `Response` as well but only contains the head of the response this was called on. + pub fn into_parts(self) -> (Response<()>, B) { + self.replace_body(()) + } + + /// Returns new response with mapped body. pub fn map_body(mut self, f: F) -> Response where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let body = f(&mut self.head, self.body); Response { - body, head: self.head, + body, error: self.error, } } - /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.body.take_body() + /// Returns body, consuming this response. + pub fn into_body(self) -> B { + self.body } } @@ -264,19 +240,13 @@ where } } -impl Future for Response { - type Output = Result, Error>; - - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(Response { - head: self.head.take(), - body: self.body.take_body(), - error: self.error.take(), - })) +impl Default for Response { + #[inline] + fn default() -> Response { + Response::with_body(StatusCode::default(), B::default()) } } -/// Helper converters impl>, E: Into> From> for Response { fn from(res: Result) -> Self { match res { diff --git a/actix-http/src/response_builder.rs b/actix-http/src/response_builder.rs index 0105f70cf..3fb94dad5 100644 --- a/actix-http/src/response_builder.rs +++ b/actix-http/src/response_builder.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use futures_core::Stream; use crate::{ - body::{Body, BodyStream, ResponseBody}, + body::{Body, BodyStream}, error::{Error, HttpError}, header::{self, IntoHeaderPair, IntoHeaderValue}, message::{BoxedResponseHead, ConnectionType, ResponseHead}, @@ -38,10 +38,11 @@ use crate::{ /// .body("1234"); /// /// assert_eq!(res.status(), StatusCode::OK); -/// assert_eq!(body::to_bytes(res.take_body()).await.unwrap(), &b"1234"[..]); /// /// assert!(res.headers().contains_key("server")); /// assert_eq!(res.headers().get_all("set-cookie").count(), 2); +/// +/// assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), &b"1234"[..]); /// # }) /// ``` pub struct ResponseBuilder { @@ -236,23 +237,24 @@ impl ResponseBuilder { #[inline] pub fn body>(&mut self, body: B) -> Response { self.message_body(body.into()) + .unwrap_or_else(Response::from_error) } /// Generate response with a body. /// /// This `ResponseBuilder` will be left in a useless state. - pub fn message_body(&mut self, body: B) -> Response { - if let Some(e) = self.err.take() { - return Response::from(Error::from(e)).into_body(); + pub fn message_body(&mut self, body: B) -> Result, Error> { + if let Some(err) = self.err.take() { + return Err(err.into()); } let response = self.head.take().expect("cannot reuse response builder"); - Response { + Ok(Response { head: response, - body: ResponseBody::Body(body), + body, error: None, - } + }) } /// Generate response with a streaming body. diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 72870bab5..bf1ca9385 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -52,7 +52,7 @@ where fn call(&self, (req, mut framed): (Request, Framed)) -> Self::Future { let fut = async move { - let res = ws::handshake(req.head()).unwrap().message_body(()); + let res = ws::handshake(req.head()).unwrap().message_body(()).unwrap(); framed .send((res, body::BodySize::None).into()) diff --git a/src/app_service.rs b/src/app_service.rs index 32c779a32..ca6f36202 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -166,8 +166,7 @@ impl AppInitServiceState { Rc::new(AppInitServiceState { rmap, config, - // TODO: AppConfig can be used to pass user defined HttpRequestPool - // capacity. + // TODO: AppConfig can be used to pass user defined HttpRequestPool capacity. pool: HttpRequestPool::default(), }) } diff --git a/src/error.rs b/src/error.rs index cc1a055b8..a5a245693 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,11 @@ use url::ParseError as UrlParseError; use crate::http::StatusCode; +/// A convenience [`Result`](std::result::Result) for Actix Web operations. +/// +/// This type alias is generally used to avoid writing out `actix_http::Error` directly. +pub type Result = std::result::Result; + /// Errors which can occur when attempting to generate resource uri. #[derive(Debug, PartialEq, Display, Error, From)] #[non_exhaustive] @@ -26,7 +31,6 @@ pub enum UrlGenerationError { ParseError(UrlParseError), } -/// `InternalServerError` for `UrlGeneratorError` impl ResponseError for UrlGenerationError {} /// A set of errors that can occur during parsing urlencoded payloads @@ -70,7 +74,6 @@ pub enum UrlencodedError { Payload(PayloadError), } -/// Return `BadRequest` for `UrlencodedError` impl ResponseError for UrlencodedError { fn status_code(&self) -> StatusCode { match self { @@ -149,7 +152,6 @@ pub enum QueryPayloadError { Deserialize(serde::de::value::Error), } -/// Return `BadRequest` for `QueryPayloadError` impl ResponseError for QueryPayloadError { fn status_code(&self) -> StatusCode { StatusCode::BAD_REQUEST @@ -177,7 +179,6 @@ pub enum ReadlinesError { ContentTypeError(ContentTypeError), } -/// Return `BadRequest` for `ReadlinesError` impl ResponseError for ReadlinesError { fn status_code(&self) -> StatusCode { match *self { diff --git a/src/lib.rs b/src/lib.rs index 4d0ad26ed..96e6ecbf8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,7 @@ pub(crate) mod types; pub mod web; pub use actix_http::Response as BaseHttpResponse; -pub use actix_http::{body, Error, HttpMessage, ResponseError, Result}; +pub use actix_http::{body, Error, HttpMessage, ResponseError}; #[doc(inline)] pub use actix_rt as rt; pub use actix_web_codegen::*; @@ -105,6 +105,7 @@ pub use actix_web_codegen::*; pub use cookie; pub use crate::app::App; +pub use crate::error::Result; pub use crate::extract::FromRequest; pub use crate::request::HttpRequest; pub use crate::resource::Resource; diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 3a85591da..4f2f2a504 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -1,12 +1,13 @@ //! For middleware documentation, see [`Compat`]. use std::{ + error::Error as StdError, future::Future, pin::Pin, task::{Context, Poll}, }; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_service::{Service, Transform}; use futures_core::{future::LocalBoxFuture, ready}; @@ -116,10 +117,10 @@ pub trait MapServiceResponseBody { impl MapServiceResponseBody for ServiceResponse where B: MessageBody + Unpin + 'static, - B::Error: Into, + B::Error: Into>, { fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) + self.map_body(|_, body| Body::from_message(body)) } } diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 6a56e6de0..f8514c7cc 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -10,7 +10,7 @@ use std::{ }; use actix_http::{ - body::MessageBody, + body::{MessageBody, ResponseBody}, encoding::Encoder, http::header::{ContentEncoding, ACCEPT_ENCODING}, Error, @@ -59,7 +59,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Transform = CompressMiddleware; type InitError = (); @@ -83,7 +83,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Future = CompressResponse; @@ -127,7 +127,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Output = Result>, Error>; + type Output = Result>>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -140,9 +140,9 @@ where *this.encoding }; - Poll::Ready(Ok( - resp.map_body(move |head, body| Encoder::response(enc, head, body)) - )) + Poll::Ready(Ok(resp.map_body(move |head, body| { + Encoder::response(enc, head, ResponseBody::Body(body)) + }))) } Err(e) => Poll::Ready(Err(e)), } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 8a60d6c70..bbb0e3dc4 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -21,7 +21,7 @@ use regex::{Regex, RegexSet}; use time::OffsetDateTime; use crate::{ - dev::{BodySize, MessageBody, ResponseBody}, + dev::{BodySize, MessageBody}, http::{HeaderName, StatusCode}, service::{ServiceRequest, ServiceResponse}, Error, HttpResponse, Result, @@ -289,13 +289,11 @@ where let time = *this.time; let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| { - ResponseBody::Body(StreamLog { - body, - time, - format, - size: 0, - }) + Poll::Ready(Ok(res.map_body(move |_, body| StreamLog { + body, + time, + format, + size: 0, }))) } } @@ -305,7 +303,7 @@ use pin_project::{pin_project, pinned_drop}; #[pin_project(PinnedDrop)] pub struct StreamLog { #[pin] - body: ResponseBody, + body: B, format: Option, size: usize, time: OffsetDateTime, @@ -342,12 +340,15 @@ where cx: &mut Context<'_>, ) -> Poll>> { let this = self.project(); - match this.body.poll_next(cx) { - Poll::Ready(Some(Ok(chunk))) => { + + // TODO: MSRV 1.51: poll_map_err + match ready!(this.body.poll_next(cx)) { + Some(Ok(chunk)) => { *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } - val => val, + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), } } } diff --git a/src/responder.rs b/src/responder.rs index 7b8288ed8..2393d046b 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -264,7 +264,7 @@ pub(crate) mod tests { let resp = srv.call(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"some")); } @@ -277,16 +277,28 @@ pub(crate) mod tests { fn body(&self) -> &Body; } + impl BodyTest for Body { + fn bin_ref(&self) -> &[u8] { + match self { + Body::Bytes(ref bin) => &bin, + _ => unreachable!("bug in test impl"), + } + } + fn body(&self) -> &Body { + self + } + } + impl BodyTest for ResponseBody { fn bin_ref(&self) -> &[u8] { match self { ResponseBody::Body(ref b) => match b { Body::Bytes(ref bin) => &bin, - _ => panic!(), + _ => unreachable!("bug in test impl"), }, ResponseBody::Other(ref b) => match b { Body::Bytes(ref bin) => &bin, - _ => panic!(), + _ => unreachable!("bug in test impl"), }, } } diff --git a/src/response/builder.rs b/src/response/builder.rs index 80086bfd3..b9a10c56b 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -310,16 +310,19 @@ impl HttpResponseBuilder { /// /// `HttpResponseBuilder` can not be used after this call. #[inline] - pub fn body>(&mut self, body: B) -> HttpResponse { - self.message_body(body.into()) + pub fn body>(&mut self, body: B) -> HttpResponse { + match self.message_body(body.into()) { + Ok(res) => res, + Err(err) => HttpResponse::from_error(err), + } } /// Set a body and generate `Response`. /// /// `HttpResponseBuilder` can not be used after this call. - pub fn message_body(&mut self, body: B) -> HttpResponse { + pub fn message_body(&mut self, body: B) -> Result, Error> { if let Some(err) = self.err.take() { - return HttpResponse::from_error(Error::from(err)).into_body(); + return Err(err.into()); } let res = self @@ -336,12 +339,12 @@ impl HttpResponseBuilder { for cookie in jar.delta() { match HeaderValue::from_str(&cookie.to_string()) { Ok(val) => res.headers_mut().append(header::SET_COOKIE, val), - Err(err) => return HttpResponse::from_error(Error::from(err)).into_body(), + Err(err) => return Err(err.into()), }; } } - res + Ok(res) } /// Set a streaming body and generate `Response`. @@ -477,42 +480,42 @@ mod tests { #[actix_rt::test] async fn test_json() { - let mut resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); + let resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); - let mut resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); + let resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); // content type override - let mut resp = HttpResponse::Ok() + let resp = HttpResponse::Ok() .insert_header((CONTENT_TYPE, "text/json")) .json(&vec!["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("text/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); } #[actix_rt::test] async fn test_serde_json_in_body() { - let mut resp = HttpResponse::Ok().body( + let resp = HttpResponse::Ok().body( serde_json::to_vec(&serde_json::json!({ "test-key": "test-value" })).unwrap(), ); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"{"test-key":"test-value"}"# ); } diff --git a/src/response/response.rs b/src/response/response.rs index 6e09a0136..194e2dff8 100644 --- a/src/response/response.rs +++ b/src/response/response.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_http::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, http::{header::HeaderMap, StatusCode}, Extensions, Response, ResponseHead, }; @@ -27,7 +27,7 @@ use crate::{error::Error, HttpResponseBuilder}; /// An HTTP Response pub struct HttpResponse { res: Response, - error: Option, + pub(crate) error: Option, } impl HttpResponse { @@ -56,14 +56,6 @@ impl HttpResponse { error: Some(error), } } - - /// Convert response to response with body - pub fn into_body(self) -> HttpResponse { - HttpResponse { - res: self.res.into_body(), - error: self.error, - } - } } impl HttpResponse { @@ -192,7 +184,7 @@ impl HttpResponse { /// Get body of this response #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { self.res.body() } @@ -206,7 +198,7 @@ impl HttpResponse { } /// Split response and body - pub fn into_parts(self) -> (HttpResponse<()>, ResponseBody) { + pub fn into_parts(self) -> (HttpResponse<()>, B) { let (head, body) = self.res.into_parts(); ( @@ -229,7 +221,7 @@ impl HttpResponse { /// Set a body and return previous body value pub fn map_body(self, f: F) -> HttpResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { HttpResponse { res: self.res.map_body(f), @@ -238,8 +230,8 @@ impl HttpResponse { } /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.res.take_body() + pub fn into_body(self) -> B { + self.res.into_body() } } @@ -274,20 +266,25 @@ impl From> for Response { // TODO: expose cause somewhere? // if let Some(err) = res.error { - // eprintln!("impl From> for Response let Some(err)"); - // return Response::from_error(err).into_body(); + // return Response::from_error(err); // } res.res } } -impl Future for HttpResponse { +// Future is only implemented for Body payload type because it's the most useful for making simple +// handlers without async blocks. Making it generic over all MessageBody types requires a future +// impl on Response which would cause it's body field to be, undesirably, Option. +// +// This impl is not particularly efficient due to the Response construction and should probably +// not be invoked if performance is important. Prefer an async fn/block in such cases. +impl Future for HttpResponse { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { if let Some(err) = self.error.take() { - return Poll::Ready(Ok(Response::from_error(err).into_body())); + return Poll::Ready(Err(err)); } Poll::Ready(Ok(mem::replace( diff --git a/src/scope.rs b/src/scope.rs index 3be6adb0c..412c01d95 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -578,7 +578,7 @@ mod tests { use actix_utils::future::ok; use bytes::Bytes; - use crate::dev::{Body, ResponseBody}; + use crate::dev::Body; use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::middleware::DefaultHeaders; use crate::service::ServiceRequest; @@ -748,7 +748,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project1")); } @@ -849,7 +849,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project_1")); } @@ -877,7 +877,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: test - 1")); } diff --git a/src/service.rs b/src/service.rs index 0c03f84ad..b7f244797 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,7 +2,7 @@ use std::cell::{Ref, RefMut}; use std::rc::Rc; use std::{fmt, net}; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_http::http::{HeaderMap, Method, StatusCode, Uri, Version}; use actix_http::{ Error, Extensions, HttpMessage, Payload, PayloadStream, RequestHead, Response, ResponseHead, @@ -110,9 +110,9 @@ impl ServiceRequest { /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> ServiceResponse { + pub fn error_response>(self, err: E) -> ServiceResponse { let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.req, res.into_body()) + ServiceResponse::new(self.req, res) } /// This method returns reference to the request head @@ -335,22 +335,24 @@ pub struct ServiceResponse { response: HttpResponse, } +impl ServiceResponse { + /// Create service response from the error + pub fn from_err>(err: E, request: HttpRequest) -> Self { + let response = HttpResponse::from_error(err.into()); + ServiceResponse { request, response } + } +} + impl ServiceResponse { /// Create service response instance pub fn new(request: HttpRequest, response: HttpResponse) -> Self { ServiceResponse { request, response } } - /// Create service response from the error - pub fn from_err>(err: E, request: HttpRequest) -> Self { - let response = HttpResponse::from_error(err.into()).into_body(); - ServiceResponse { request, response } - } - /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> Self { - Self::from_err(err, self.request) + pub fn error_response>(self, err: E) -> ServiceResponse { + ServiceResponse::from_err(err, self.request) } /// Create service response @@ -396,23 +398,18 @@ impl ServiceResponse { } /// Execute closure and in case of error convert it to response. - pub fn checked_expr(mut self, f: F) -> Self + pub fn checked_expr(mut self, f: F) -> Result where F: FnOnce(&mut Self) -> Result<(), E>, E: Into, { - match f(&mut self) { - Ok(_) => self, - Err(err) => { - let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.request, res.into_body()) - } - } + f(&mut self).map_err(Into::into)?; + Ok(self) } /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.response.take_body() + pub fn into_body(self) -> B { + self.response.into_body() } } @@ -420,7 +417,7 @@ impl ServiceResponse { /// Set a new body pub fn map_body(self, f: F) -> ServiceResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let response = self.response.map_body(f); diff --git a/src/test.rs b/src/test.rs index 9fe5e9b5d..de97dc8aa 100644 --- a/src/test.rs +++ b/src/test.rs @@ -4,13 +4,14 @@ use std::{net::SocketAddr, rc::Rc}; pub use actix_http::test::TestBuffer; use actix_http::{ + body, http::{header::IntoHeaderPair, Method, StatusCode, Uri, Version}, test::TestRequest as HttpTestRequest, Extensions, Request, }; use actix_router::{Path, ResourceDef, Url}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use actix_utils::future::ok; +use actix_utils::future::{ok, poll_fn}; use futures_core::Stream; use futures_util::StreamExt as _; use serde::{de::DeserializeOwned, Serialize}; @@ -153,16 +154,17 @@ where B: MessageBody + Unpin, B::Error: Into, { - let mut resp = app + let resp = app .call(req) .await .unwrap_or_else(|e| panic!("read_response failed at application call: {}", e)); - let mut body = resp.take_body(); + let body = resp.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } bytes.freeze() @@ -194,16 +196,19 @@ where /// assert_eq!(result, Bytes::from_static(b"welcome!")); /// } /// ``` -pub async fn read_body(mut res: ServiceResponse) -> Bytes +pub async fn read_body(res: ServiceResponse) -> Bytes where B: MessageBody + Unpin, B::Error: Into, { - let mut body = res.take_body(); + let body = res.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } + bytes.freeze() } @@ -271,6 +276,14 @@ where Ok(data.freeze()) } +pub async fn load_body(body: B) -> Result +where + B: MessageBody + Unpin, + B::Error: Into, +{ + body::to_bytes(body).await.map_err(Into::into) +} + /// Helper function that returns a deserialized response body of a TestRequest /// /// ``` diff --git a/src/types/json.rs b/src/types/json.rs index 322e5cbf3..5762c6428 100644 --- a/src/types/json.rs +++ b/src/types/json.rs @@ -435,7 +435,7 @@ mod tests { header::{self, CONTENT_LENGTH, CONTENT_TYPE}, StatusCode, }, - test::{load_stream, TestRequest}, + test::{load_body, TestRequest}, }; #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -492,10 +492,10 @@ mod tests { .to_http_parts(); let s = Json::::from_request(&req, &mut pl).await; - let mut resp = HttpResponse::from_error(s.err().unwrap()); + let resp = HttpResponse::from_error(s.err().unwrap()); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); - let body = load_stream(resp.take_body()).await.unwrap(); + let body = load_body(resp.into_body()).await.unwrap(); let msg: MyObject = serde_json::from_slice(&body).unwrap(); assert_eq!(msg.name, "invalid request"); } diff --git a/tests/test_server.rs b/tests/test_server.rs index 2760cc7fb..756c180fc 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -32,7 +32,7 @@ use rand::{distributions::Alphanumeric, Rng}; use actix_web::dev::BodyEncoding; use actix_web::middleware::{Compress, NormalizePath, TrailingSlash}; -use actix_web::{dev, web, App, Error, HttpResponse}; +use actix_web::{web, App, Error, HttpResponse}; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ @@ -160,9 +160,7 @@ async fn test_body_gzip2() { let srv = actix_test::start_with(actix_test::config().h1(), || { App::new() .wrap(Compress::new(ContentEncoding::Gzip)) - .service(web::resource("/").route(web::to(|| { - HttpResponse::Ok().body(STR).into_body::() - }))) + .service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR)))) }); let mut response = srv @@ -903,7 +901,7 @@ async fn test_normalize() { let srv = actix_test::start_with(actix_test::config().h1(), || { App::new() .wrap(NormalizePath::new(TrailingSlash::Trim)) - .service(web::resource("/one").route(web::to(|| HttpResponse::Ok().finish()))) + .service(web::resource("/one").route(web::to(|| HttpResponse::Ok()))) }); let response = srv.get("/one/").send().await.unwrap();