From f12f62ba7370c5e908681b451f93e76bc99f8e53 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 8 Dec 2021 22:45:54 +0000 Subject: [PATCH] body ergo v4 using any body --- Cargo.toml | 1 + actix-http/src/body/message_body.rs | 6 +- actix-http/src/body/mod.rs | 2 + actix-http/src/encoding/encoder.rs | 15 ++- actix-http/src/encoding/mod.rs | 2 +- actix-http/src/error.rs | 2 + actix-http/src/response.rs | 8 +- actix-test/src/lib.rs | 72 +++++++++----- src/any_body.rs | 146 ++++++++++++++++++++++++++++ src/dev.rs | 38 -------- src/error/error.rs | 13 ++- src/error/internal.rs | 6 +- src/error/response_error.rs | 17 +++- src/lib.rs | 1 + src/middleware/compress.rs | 26 ++--- src/middleware/logger.rs | 21 ++-- src/responder.rs | 45 ++++++--- src/response/builder.rs | 5 +- src/response/response.rs | 70 +++++++------ src/server.rs | 36 ++++--- src/service.rs | 43 +++++--- src/test.rs | 4 +- src/types/either.rs | 34 ++++--- src/types/form.rs | 13 ++- src/types/json.rs | 9 +- 25 files changed, 427 insertions(+), 208 deletions(-) create mode 100644 src/any_body.rs diff --git a/Cargo.toml b/Cargo.toml index cee0680a5..ab1813f87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,6 +118,7 @@ futures-util = { version = "0.3.7", default-features = false, features = ["std"] rand = "0.8" rcgen = "0.8" rustls-pemfile = "0.2" +static_assertions = "1" tls-openssl = { package = "openssl", version = "0.10.9" } tls-rustls = { package = "rustls", version = "0.20.0" } zstd = "0.9" diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 053b6f286..83484ae42 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -19,7 +19,7 @@ use super::BodySize; pub trait MessageBody { // TODO: consider this bound to only fmt::Display since the error type is not really used // and there is an impl for Into> on String - type Error: Into>; + type Error: Into> + 'static; /// Body size hint. fn size(&self) -> BodySize; @@ -272,7 +272,7 @@ impl MessageBody for MessageBodyMapErr where B: MessageBody, F: FnOnce(B::Error) -> E, - E: Into>, + E: Into> + 'static, { type Error = E; @@ -306,6 +306,8 @@ mod tests { use super::*; + // static_assertions::assert_obj_safe!(MessageBody<()>); + macro_rules! assert_poll_next { ($pin:expr, $exp:expr) => { assert_eq!( diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index af7c4626f..23167aefc 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -1,5 +1,6 @@ //! Traits and structures to aid consuming and writing HTTP payloads. +// mod any; mod body_stream; mod boxed; mod either; @@ -9,6 +10,7 @@ mod size; mod sized_stream; mod utils; +// pub use self::any::AnyBody; pub use self::body_stream::BodyStream; pub use self::boxed::BoxBody; pub use self::either::EitherBody; diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 0886221cc..2e9af5462 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -53,6 +53,15 @@ impl Encoder { } } + pub fn not_acceptable(body: Bytes) -> Self { + Encoder { + body: EncoderBody::Bytes { body }, + encoder: None, + fut: None, + eof: false, + } + } + pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self { let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) || head.status == StatusCode::SWITCHING_PROTOCOLS @@ -99,6 +108,7 @@ pin_project! { #[project = EncoderBodyProj] enum EncoderBody { None, + Bytes { body: Bytes }, Stream { #[pin] body: B }, } } @@ -112,6 +122,7 @@ where fn size(&self) -> BodySize { match self { EncoderBody::None => BodySize::None, + EncoderBody::Bytes { body } => body.size(), EncoderBody::Stream { body } => body.size(), } } @@ -122,7 +133,9 @@ where ) -> Poll>> { match self.project() { EncoderBodyProj::None => Poll::Ready(None), - + EncoderBodyProj::Bytes { body } => { + Pin::new(body).poll_next(cx).map_err(|err| match err {}) + } EncoderBodyProj::Stream { body } => body .poll_next(cx) .map_err(|err| EncoderError::Body(err.into())), diff --git a/actix-http/src/encoding/mod.rs b/actix-http/src/encoding/mod.rs index d51dd66c0..261abc9f3 100644 --- a/actix-http/src/encoding/mod.rs +++ b/actix-http/src/encoding/mod.rs @@ -8,7 +8,7 @@ mod decoder; mod encoder; pub use self::decoder::Decoder; -pub use self::encoder::Encoder; +pub use self::encoder::{Encoder, EncoderError}; /// Special-purpose writer for streaming (de-)compression. /// diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index a04867ae1..f15a85e4b 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -9,6 +9,8 @@ use crate::{body::BoxBody, ws, Response}; pub use http::Error as HttpError; +pub use crate::encoding::EncoderError; + pub struct Error { inner: Box, } diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index 861cab2cb..0dbc08b98 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -2,7 +2,7 @@ use std::{ cell::{Ref, RefMut}, - fmt, str, + fmt, mem, str, }; use bytes::{Bytes, BytesMut}; @@ -203,6 +203,12 @@ impl Response { } } +impl Response { + pub fn take_body(&mut self) -> B { + mem::take(&mut self.body) + } +} + impl fmt::Debug for Response where B: MessageBody, diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index 934b8f3aa..637ab9060 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -163,9 +163,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -179,9 +181,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -195,9 +199,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -214,9 +220,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -230,9 +238,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -246,9 +256,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -265,9 +277,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -281,9 +295,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) @@ -297,9 +313,11 @@ where local_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = + err.into().error_response().into(); + res.map_into_boxed_body() + }); HttpService::build() .client_timeout(timeout) diff --git a/src/any_body.rs b/src/any_body.rs new file mode 100644 index 000000000..75a8dd296 --- /dev/null +++ b/src/any_body.rs @@ -0,0 +1,146 @@ +use std::{ + error::Error as StdError, + mem, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_http::body::{BodySize, BoxBody, MessageBody}; +use bytes::Bytes; +use pin_project_lite::pin_project; + +use crate::Error; + +pin_project! { + #[derive(Debug)] + #[project = AnyBodyProj] + pub enum AnyBody { + None, + Full { body: Bytes }, + Stream { #[pin] body: B }, + Boxed { body: BoxBody }, + } +} + +impl AnyBody { + pub fn into_body(self) -> AnyBody { + match self { + AnyBody::None => AnyBody::None, + AnyBody::Full { body } => AnyBody::Full { body }, + AnyBody::Stream { body } => AnyBody::Boxed { + body: BoxBody::new(body), + }, + AnyBody::Boxed { body } => AnyBody::Boxed { body }, + } + } +} + +impl Default for AnyBody { + fn default() -> Self { + Self::Full { body: Bytes::new() } + } +} + +impl MessageBody for AnyBody +where + B: MessageBody, + B::Error: 'static, +{ + type Error = Box; + + fn size(&self) -> BodySize { + match self { + Self::None => BodySize::None, + Self::Full { body } => body.size(), + Self::Stream { body } => body.size(), + Self::Boxed { body } => body.size(), + } + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.project() { + AnyBodyProj::None => Poll::Ready(None), + AnyBodyProj::Full { body } => { + let bytes = mem::take(body); + Poll::Ready(Some(Ok(bytes))) + } + AnyBodyProj::Stream { body } => body.poll_next(cx).map_err(|err| err.into()), + AnyBodyProj::Boxed { body } => body.as_pin_mut().poll_next(cx), + } + } +} + +pin_project! { + #[project = EitherAnyBodyProj] + #[derive(Debug)] + pub enum EitherAnyBody { + /// A body of type `L`. + Left { #[pin] body: AnyBody }, + + /// A body of type `R`. + Right { #[pin] body: AnyBody }, + } +} + +// impl EitherAnyBody { +// /// Creates new `EitherBody` using left variant and boxed right variant. +// pub fn new(body: L) -> Self { +// Self::Left { +// body: AnyBody::Stream { body }, +// } +// } +// } + +// impl EitherAnyBody { +// /// Creates new `EitherBody` using left variant. +// pub fn left(body: L) -> Self { +// Self::Left { +// body: AnyBody::Stream { body }, +// } +// } + +// /// Creates new `EitherBody` using right variant. +// pub fn right(body: R) -> Self { +// Self::Right { +// body: AnyBody::Stream { body }, +// } +// } +// } + +impl MessageBody for EitherAnyBody +where + L: MessageBody + 'static, + R: MessageBody + 'static, +{ + type Error = Error; + + fn size(&self) -> BodySize { + match self { + Self::Left { body } => body.size(), + Self::Right { body } => body.size(), + } + } + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + match self.project() { + EitherAnyBodyProj::Left { body } => body.poll_next(cx).map_err(Error::from), + EitherAnyBodyProj::Right { body } => body.poll_next(cx).map_err(Error::from), + } + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_eq_size; + + use super::*; + + assert_eq_size!(AnyBody<()>, [u8; 40]); + assert_eq_size!(AnyBody, [u8; 40]); // how is this the same size as () +} diff --git a/src/dev.rs b/src/dev.rs index d4a64985c..23a40f292 100644 --- a/src/dev.rs +++ b/src/dev.rs @@ -102,41 +102,3 @@ impl BodyEncoding for crate::HttpResponse { self } } - -// TODO: remove this if it doesn't appear to be needed - -#[allow(dead_code)] -#[derive(Debug)] -pub(crate) enum AnyBody { - None, - Full { body: crate::web::Bytes }, - Boxed { body: actix_http::body::BoxBody }, -} - -impl crate::body::MessageBody for AnyBody { - type Error = crate::BoxError; - - /// Body size hint. - fn size(&self) -> crate::body::BodySize { - match self { - AnyBody::None => crate::body::BodySize::None, - AnyBody::Full { body } => body.size(), - AnyBody::Boxed { body } => body.size(), - } - } - - /// Attempt to pull out the next chunk of body bytes. - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> { - match self.get_mut() { - AnyBody::None => std::task::Poll::Ready(None), - AnyBody::Full { body } => { - let bytes = std::mem::take(body); - std::task::Poll::Ready(Some(Ok(bytes))) - } - AnyBody::Boxed { body } => body.as_pin_mut().poll_next(cx), - } - } -} diff --git a/src/error/error.rs b/src/error/error.rs index be17c1962..e1a233b4a 100644 --- a/src/error/error.rs +++ b/src/error/error.rs @@ -2,7 +2,7 @@ use std::{error::Error as StdError, fmt}; use actix_http::{body::BoxBody, Response}; -use crate::{HttpResponse, ResponseError}; +use crate::{any_body::AnyBody, HttpResponse, ResponseError}; /// General purpose actix web error. /// @@ -69,8 +69,15 @@ impl From for Error { } } -impl From for Response { - fn from(err: Error) -> Response { +impl From for Response> { + fn from(err: Error) -> Self { err.error_response().into() } } + +impl From for actix_http::Response { + fn from(err: Error) -> Self { + let res: actix_http::Response<_> = err.error_response().into(); + res.map_into_boxed_body() + } +} diff --git a/src/error/internal.rs b/src/error/internal.rs index b8e169018..cda16f927 100644 --- a/src/error/internal.rs +++ b/src/error/internal.rs @@ -7,7 +7,7 @@ use actix_http::{ }; use bytes::{BufMut as _, BytesMut}; -use crate::{Error, HttpRequest, HttpResponse, Responder, ResponseError}; +use crate::{any_body::AnyBody, Error, HttpRequest, HttpResponse, Responder, ResponseError}; /// Wraps errors to alter the generated response status code. /// @@ -91,7 +91,9 @@ where let mime = mime::TEXT_PLAIN_UTF_8.try_into_value().unwrap(); res.headers_mut().insert(header::CONTENT_TYPE, mime); - res.set_body(BoxBody::new(buf.into_inner())) + res.set_body(AnyBody::Full { + body: buf.into_inner().freeze(), + }) } InternalErrorType::Response(ref resp) => { diff --git a/src/error/response_error.rs b/src/error/response_error.rs index 7260efa1a..103e20f96 100644 --- a/src/error/response_error.rs +++ b/src/error/response_error.rs @@ -14,6 +14,7 @@ use actix_http::{ use bytes::BytesMut; use crate::{ + any_body::AnyBody, error::{downcast_dyn, downcast_get_type_id}, helpers, HttpResponse, }; @@ -33,7 +34,7 @@ pub trait ResponseError: fmt::Debug + fmt::Display { /// /// By default, the generated response uses a 500 Internal Server Error status code, a /// `Content-Type` of `text/plain`, and the body is set to `Self`'s `Display` impl. - fn error_response(&self) -> HttpResponse { + fn error_response(&self) -> HttpResponse { let mut res = HttpResponse::new(self.status_code()); let mut buf = BytesMut::new(); @@ -42,7 +43,7 @@ pub trait ResponseError: fmt::Debug + fmt::Display { let mime = mime::TEXT_PLAIN_UTF_8.try_into_value().unwrap(); res.headers_mut().insert(header::CONTENT_TYPE, mime); - res.set_body(BoxBody::new(buf)) + res.set_body(AnyBody::Full { body: buf.freeze() }) } downcast_get_type_id!(); @@ -50,7 +51,7 @@ pub trait ResponseError: fmt::Debug + fmt::Display { downcast_dyn!(ResponseError); -impl ResponseError for Box {} +impl ResponseError for Box {} #[cfg(feature = "openssl")] impl ResponseError for actix_tls::accept::openssl::reexports::Error {} @@ -128,7 +129,15 @@ impl ResponseError for actix_http::error::ContentTypeError { impl ResponseError for actix_http::ws::HandshakeError { fn error_response(&self) -> HttpResponse { - Response::from(self).map_into_boxed_body().into() + Response::from(self) + .map_body(|_, body| AnyBody::Boxed { body }) + .into() + } +} + +impl ResponseError for actix_http::error::EncoderError { + fn error_response(&self) -> HttpResponse { + todo!("") } } diff --git a/src/lib.rs b/src/lib.rs index a44c9b3fb..7d9163e3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,6 +70,7 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +mod any_body; mod app; mod app_service; mod config; diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index af4a107e3..8fe54f84b 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -10,7 +10,7 @@ use std::{ }; use actix_http::{ - body::{EitherBody, MessageBody}, + body::MessageBody, encoding::Encoder, header::{ContentEncoding, ACCEPT_ENCODING}, StatusCode, @@ -22,6 +22,7 @@ use once_cell::sync::Lazy; use pin_project_lite::pin_project; use crate::{ + any_body::AnyBody, dev::BodyEncoding, service::{ServiceRequest, ServiceResponse}, Error, HttpResponse, @@ -61,7 +62,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>>; + type Response = ServiceResponse>>; type Error = Error; type Transform = CompressMiddleware; type InitError = (); @@ -111,7 +112,7 @@ where S: Service, Error = Error>, B: MessageBody, { - type Response = ServiceResponse>>; + type Response = ServiceResponse>>; type Error = Error; type Future = Either, Ready>>; @@ -146,12 +147,15 @@ where let res = HttpResponse::with_body( StatusCode::NOT_ACCEPTABLE, SUPPORTED_ALGORITHM_NAMES.clone(), - ); + ) + .map_body(|_, body| match body { + AnyBody::Full { body } => AnyBody::Stream { + body: Encoder::not_acceptable(body), + }, + _ => unreachable!("probably"), + }); - Either::right(ok(req - .into_response(res) - .map_into_boxed_body() - .map_into_right_body())) + Either::right(ok(req.into_response(res))) } } } @@ -174,7 +178,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Output = Result>>, Error>; + type Output = Result>>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -187,8 +191,8 @@ where *this.encoding }; - Poll::Ready(Ok(resp.map_body(move |head, body| { - EitherBody::left(Encoder::response(enc, head, body)) + Poll::Ready(Ok(resp.map_body(move |head, body| AnyBody::Stream { + body: Encoder::response(enc, head, body), }))) } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 74daa26d5..3161ce474 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -22,6 +22,7 @@ use regex::{Regex, RegexSet}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use crate::{ + any_body::AnyBody, body::{BodySize, MessageBody}, http::header::HeaderName, service::{ServiceRequest, ServiceResponse}, @@ -175,7 +176,7 @@ impl Default for Logger { } } -impl Transform for Logger +impl Transform for Logger where S: Service, Error = Error>, B: MessageBody, @@ -210,7 +211,7 @@ pub struct LoggerMiddleware { service: S, } -impl Service for LoggerMiddleware +impl Service for LoggerMiddleware where S: Service, Error = Error>, B: MessageBody, @@ -262,7 +263,7 @@ pin_project! { } } -impl Future for LoggerResponse +impl Future for LoggerResponse where B: MessageBody, S: Service, Error = Error>, @@ -290,11 +291,13 @@ where let time = *this.time; let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| StreamLog { - body, - time, - format, - size: 0, + Poll::Ready(Ok(res.map_body(move |_, body| AnyBody::Stream { + body: StreamLog { + body: body.into_body(), + time, + format, + size: 0, + }, }))) } } @@ -302,7 +305,7 @@ where pin_project! { pub struct StreamLog { #[pin] - body: B, + body: AnyBody, format: Option, size: usize, time: OffsetDateTime, diff --git a/src/responder.rs b/src/responder.rs index e72739a71..48bef3fa4 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; use actix_http::{ - body::{BoxBody, EitherBody, MessageBody}, + body::{BoxBody, MessageBody}, error::HttpError, header::HeaderMap, header::IntoHeaderPair, @@ -9,7 +9,9 @@ use actix_http::{ }; use bytes::{Bytes, BytesMut}; -use crate::{BoxError, Error, HttpRequest, HttpResponse, HttpResponseBuilder}; +use crate::{ + any_body::AnyBody, BoxError, Error, HttpRequest, HttpResponse, HttpResponseBuilder, +}; /// Trait implemented by types that can be converted to an HTTP response. /// @@ -72,7 +74,7 @@ impl Responder for HttpResponse { } } -impl Responder for actix_http::Response { +impl Responder for actix_http::Response> { type Body = BoxBody; #[inline] @@ -95,7 +97,10 @@ impl Responder for actix_http::ResponseBuilder { #[inline] fn respond_to(mut self, req: &HttpRequest) -> HttpResponse { - self.finish().map_into_boxed_body().respond_to(req) + self.finish() + .map_into_boxed_body() + .map_body(|_, body| AnyBody::Boxed { body }) + .respond_to(req) } } @@ -104,12 +109,12 @@ where T: Responder, ::Error: Into, { - type Body = EitherBody; + type Body = T::Body; fn respond_to(self, req: &HttpRequest) -> HttpResponse { match self { - Some(val) => val.respond_to(req).map_into_left_body(), - None => HttpResponse::new(StatusCode::NOT_FOUND).map_into_right_body(), + Some(val) => val.respond_to(req), + None => HttpResponse::new(StatusCode::NOT_FOUND).map_into_body(), } } } @@ -120,12 +125,12 @@ where ::Error: Into, E: Into, { - type Body = EitherBody; + type Body = T::Body; fn respond_to(self, req: &HttpRequest) -> HttpResponse { match self { - Ok(val) => val.respond_to(req).map_into_left_body(), - Err(err) => HttpResponse::from_error(err.into()).map_into_right_body(), + Ok(val) => val.respond_to(req), + Err(err) => HttpResponse::from_error(err.into()).map_into_body(), } } } @@ -146,7 +151,12 @@ macro_rules! impl_responder_by_forward_into_base_response { type Body = $body; fn respond_to(self, _: &HttpRequest) -> HttpResponse { - let res: actix_http::Response<_> = self.into(); + let res = actix_http::Response::with_body( + StatusCode::default(), + AnyBody::Full { + body: Bytes::from(self), + }, + ); res.into() } } @@ -171,7 +181,12 @@ macro_rules! impl_into_string_responder { fn respond_to(self, _: &HttpRequest) -> HttpResponse { let string: String = self.into(); - let res: actix_http::Response<_> = string.into(); + let res = actix_http::Response::with_body( + StatusCode::default(), + AnyBody::Full { + body: Bytes::from(string), + }, + ); res.into() } } @@ -250,12 +265,12 @@ where T: Responder, ::Error: Into, { - type Body = EitherBody; + type Body = T::Body; fn respond_to(self, req: &HttpRequest) -> HttpResponse { let headers = match self.headers { Ok(headers) => headers, - Err(err) => return HttpResponse::from_error(err).map_into_right_body(), + Err(err) => return HttpResponse::from_error(err).map_into_body(), }; let mut res = self.responder.respond_to(req); @@ -269,7 +284,7 @@ where res.headers_mut().insert(k, v); } - res.map_into_left_body() + res } } diff --git a/src/response/builder.rs b/src/response/builder.rs index 18a1c8a7f..a6f0e927a 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -22,6 +22,7 @@ use actix_http::header::HeaderValue; use cookie::{Cookie, CookieJar}; use crate::{ + any_body::AnyBody, error::{Error, JsonPayloadError}, BoxError, HttpResponse, }; @@ -333,7 +334,7 @@ impl HttpResponseBuilder { .set_body(body); #[allow(unused_mut)] // mut is only unused when cookies are disabled - let mut res = HttpResponse::from(res); + let mut res = HttpResponse::from(res.map_body(|_, body| AnyBody::Stream { body })); #[cfg(feature = "cookies")] if let Some(ref jar) = self.cookies { @@ -416,7 +417,7 @@ impl From for HttpResponse { } } -impl From for Response { +impl From for Response> { fn from(mut builder: HttpResponseBuilder) -> Self { builder.finish().into() } diff --git a/src/response/response.rs b/src/response/response.rs index 1900dd845..f099b7e7a 100644 --- a/src/response/response.rs +++ b/src/response/response.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_http::{ - body::{BoxBody, EitherBody, MessageBody}, + body::{BoxBody, MessageBody, None as NoneBody}, header::HeaderMap, Extensions, Response, ResponseHead, StatusCode, }; @@ -22,11 +22,11 @@ use { cookie::Cookie, }; -use crate::{error::Error, HttpResponseBuilder}; +use crate::{any_body::AnyBody, error::Error, HttpResponseBuilder}; /// An outgoing response. pub struct HttpResponse { - res: Response, + res: Response>, pub(crate) error: Option, } @@ -35,7 +35,7 @@ impl HttpResponse { #[inline] pub fn new(status: StatusCode) -> Self { Self { - res: Response::new(status), + res: Response::with_body(status, AnyBody::default()), error: None, } } @@ -54,6 +54,13 @@ impl HttpResponse { response.error = Some(error); response } + + pub fn map_into_body(self) -> HttpResponse + where + B: MessageBody + 'static, + { + self.map_body(|_, body| body.into_body()) + } } impl HttpResponse { @@ -61,7 +68,7 @@ impl HttpResponse { #[inline] pub fn with_body(status: StatusCode, body: B) -> Self { Self { - res: Response::with_body(status, body), + res: Response::with_body(status, AnyBody::Stream { body }), error: None, } } @@ -182,12 +189,12 @@ impl HttpResponse { /// Get body of this response #[inline] - pub fn body(&self) -> &B { + pub fn body(&self) -> &AnyBody { self.res.body() } /// Set a body - pub fn set_body(self, body: B2) -> HttpResponse { + pub fn set_body(self, body: AnyBody) -> HttpResponse { HttpResponse { res: self.res.set_body(body), error: None, @@ -196,12 +203,12 @@ impl HttpResponse { } /// Split response and body - pub fn into_parts(self) -> (HttpResponse<()>, B) { + pub fn into_parts(self) -> (HttpResponse<()>, AnyBody) { let (head, body) = self.res.into_parts(); ( HttpResponse { - res: head, + res: head.map_body(|_, _b| AnyBody::default()), error: None, }, body, @@ -211,7 +218,7 @@ impl HttpResponse { /// Drop request's body pub fn drop_body(self) -> HttpResponse<()> { HttpResponse { - res: self.res.drop_body(), + res: self.res.drop_body().map_body(|_, _b| AnyBody::default()), error: None, } } @@ -219,7 +226,7 @@ impl HttpResponse { /// Set a body and return previous body value pub fn map_body(self, f: F) -> HttpResponse where - F: FnOnce(&mut ResponseHead, B) -> B2, + F: FnOnce(&mut ResponseHead, AnyBody) -> AnyBody, { HttpResponse { res: self.res.map_body(f), @@ -229,27 +236,28 @@ impl HttpResponse { // TODO: docs for the body map methods below - #[inline] - pub fn map_into_left_body(self) -> HttpResponse> { - self.map_body(|_, body| EitherBody::left(body)) - } - - #[inline] - pub fn map_into_right_body(self) -> HttpResponse> { - self.map_body(|_, body| EitherBody::right(body)) - } - #[inline] pub fn map_into_boxed_body(self) -> HttpResponse where B: MessageBody + 'static, { - // TODO: avoid double boxing with down-casting, if it improves perf - self.map_body(|_, body| BoxBody::new(body)) + self.map_body(|_, body| AnyBody::Boxed { + body: match body { + AnyBody::None => BoxBody::new(NoneBody::new()), + AnyBody::Full { body } => BoxBody::new(body), + AnyBody::Stream { body } => BoxBody::new(body), + AnyBody::Boxed { body } => body, + }, + }) } /// Extract response body - pub fn into_body(self) -> B { + pub fn take_body(&mut self) -> AnyBody { + self.res.take_body() + } + + /// Extract response body + pub fn into_body(self) -> AnyBody { self.res.into_body() } } @@ -266,8 +274,8 @@ where } } -impl From> for HttpResponse { - fn from(res: Response) -> Self { +impl From>> for HttpResponse { + fn from(res: Response>) -> Self { HttpResponse { res, error: None } } } @@ -278,7 +286,7 @@ impl From for HttpResponse { } } -impl From> for Response { +impl From> for Response> { fn from(res: HttpResponse) -> Self { // this impl will always be called as part of dispatcher @@ -291,14 +299,14 @@ impl From> for Response { } } -// Future is only implemented for BoxBody payload type because it's the most useful for making +// Future is only implemented for default payload type because it's the most useful for making // simple handlers without async blocks. Making it generic over all MessageBody types requires a // future impl on Response which would cause it's body field to be, undesirably, Option. // // This impl is not particularly efficient due to the Response construction and should probably // not be invoked if performance is important. Prefer an async fn/block in such cases. -impl Future for HttpResponse { - type Output = Result, Error>; +impl Future for HttpResponse { + type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { if let Some(err) = self.error.take() { @@ -307,7 +315,7 @@ impl Future for HttpResponse { Poll::Ready(Ok(mem::replace( &mut self.res, - Response::new(StatusCode::default()), + Response::with_body(StatusCode::default(), AnyBody::None), ))) } } diff --git a/src/server.rs b/src/server.rs index b2ff423f1..44f4f5484 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ use actix_tls::accept::openssl::reexports::{AlpnError, SslAcceptor, SslAcceptorB #[cfg(feature = "rustls")] use actix_tls::accept::rustls::reexports::ServerConfig as RustlsServerConfig; -use crate::{config::AppConfig, Error}; +use crate::{any_body::AnyBody, config::AppConfig, Error}; struct Socket { scheme: &'static str, @@ -55,7 +55,7 @@ where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, - S::Response: Into>, + S::Response: Into>>, B: MessageBody, { pub(super) factory: F, @@ -75,7 +75,7 @@ where S: ServiceFactory + 'static, S::Error: Into + 'static, S::InitError: fmt::Debug, - S::Response: Into> + 'static, + S::Response: Into>> + 'static, >::Future: 'static, S::Service: 'static, @@ -300,9 +300,10 @@ where }) }; - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = err.into().error_response().into(); + res.map_into_boxed_body() + }); svc.finish(map_config(fac, move |_| { AppConfig::new(false, host.clone(), addr) @@ -360,9 +361,10 @@ where svc }; - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = err.into().error_response().into(); + res.map_into_boxed_body() + }); svc.finish(map_config(fac, move |_| { AppConfig::new(true, host.clone(), addr) @@ -544,9 +546,10 @@ where .on_connect_ext(move |io: &_, ext: _| (&*handler)(io as &dyn Any, ext)); } - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = err.into().error_response().into(); + res.map_into_boxed_body() + }); svc.finish(map_config(fac, move |_| config.clone())) }) @@ -585,9 +588,10 @@ where socket_addr, ); - let fac = factory() - .into_factory() - .map_err(|err| err.into().error_response()); + let fac = factory().into_factory().map_err(|err| { + let res: actix_http::Response<_> = err.into().error_response().into(); + res.map_into_boxed_body() + }); fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then( HttpService::build() @@ -610,7 +614,7 @@ where S: ServiceFactory, S::Error: Into, S::InitError: fmt::Debug, - S::Response: Into>, + S::Response: Into>>, S::Service: 'static, B: MessageBody, { diff --git a/src/service.rs b/src/service.rs index 88f2ba97a..7df7b61ea 100644 --- a/src/service.rs +++ b/src/service.rs @@ -5,7 +5,7 @@ use std::{ }; use actix_http::{ - body::{BoxBody, EitherBody, MessageBody}, + body::{self, BoxBody, MessageBody}, header::HeaderMap, Extensions, HttpMessage, Method, Payload, PayloadStream, RequestHead, Response, ResponseHead, StatusCode, Uri, Version, @@ -19,6 +19,7 @@ use actix_service::{ use cookie::{Cookie, ParseError as CookieParseError}; use crate::{ + any_body::AnyBody, config::{AppConfig, AppService}, dev::ensure_leading_slash, guard::Guard, @@ -112,7 +113,7 @@ impl ServiceRequest { /// Create service response #[inline] - pub fn into_response>>(self, res: R) -> ServiceResponse { + pub fn into_response>>>(self, res: R) -> ServiceResponse { let res = HttpResponse::from(res.into()); ServiceResponse::new(self.req, res) } @@ -410,9 +411,14 @@ impl ServiceResponse { self.response.headers_mut() } + #[inline] + pub fn take_body(&mut self) -> AnyBody { + self.response.take_body() + } + /// Extract response body #[inline] - pub fn into_body(self) -> B { + pub fn into_body(self) -> AnyBody { self.response.into_body() } @@ -420,7 +426,7 @@ impl ServiceResponse { #[inline] pub fn map_body(self, f: F) -> ServiceResponse where - F: FnOnce(&mut ResponseHead, B) -> B2, + F: FnOnce(&mut ResponseHead, AnyBody) -> AnyBody, { let response = self.response.map_body(f); @@ -430,22 +436,29 @@ impl ServiceResponse { } } - #[inline] - pub fn map_into_left_body(self) -> ServiceResponse> { - self.map_body(|_, body| EitherBody::left(body)) - } + // #[inline] + // pub fn map_into_left_body(self) -> ServiceResponse> { + // self.map_body(|_, body| EitherBody::left(body)) + // } - #[inline] - pub fn map_into_right_body(self) -> ServiceResponse> { - self.map_body(|_, body| EitherBody::right(body)) - } + // #[inline] + // pub fn map_into_right_body(self) -> ServiceResponse> { + // self.map_body(|_, body| EitherBody::right(body)) + // } #[inline] pub fn map_into_boxed_body(self) -> ServiceResponse where B: MessageBody + 'static, { - self.map_body(|_, body| BoxBody::new(body)) + self.map_body(|_, body| AnyBody::Stream { + body: match body { + AnyBody::None => BoxBody::new(body::None::new()), + AnyBody::Full { body } => BoxBody::new(body), + AnyBody::Stream { body } => BoxBody::new(body), + AnyBody::Boxed { body } => body, + }, + }) } } @@ -455,8 +468,8 @@ impl From> for HttpResponse { } } -impl From> for Response { - fn from(res: ServiceResponse) -> Response { +impl From> for Response> { + fn from(res: ServiceResponse) -> Response> { res.response.into() } } diff --git a/src/test.rs b/src/test.rs index cfb3ef8f2..540d07d81 100644 --- a/src/test.rs +++ b/src/test.rs @@ -163,7 +163,7 @@ where actix_rt::pin!(body); while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { - bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); + bytes.extend_from_slice(&item.unwrap()); } bytes.freeze() @@ -205,7 +205,7 @@ where actix_rt::pin!(body); while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { - bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); + bytes.extend_from_slice(&item.unwrap()); } bytes.freeze() diff --git a/src/types/either.rs b/src/types/either.rs index 3c759736e..a11f597db 100644 --- a/src/types/either.rs +++ b/src/types/either.rs @@ -12,9 +12,9 @@ use futures_core::ready; use pin_project_lite::pin_project; use crate::{ - body, dev, + dev, web::{Form, Json}, - Error, FromRequest, HttpRequest, HttpResponse, Responder, + Error, FromRequest, HttpRequest, }; /// Combines two extractor or responder types into a single type. @@ -140,21 +140,23 @@ impl Either { } } -/// See [here](#responder) for example of usage as a handler return type. -impl Responder for Either -where - L: Responder, - R: Responder, -{ - type Body = body::EitherBody; +// /// See [here](#responder) for example of usage as a handler return type. +// impl Responder for Either +// where +// L: Responder, +// R: Responder, +// { +// type Body = EitherAnyBody; - fn respond_to(self, req: &HttpRequest) -> HttpResponse { - match self { - Either::Left(a) => a.respond_to(req).map_into_left_body(), - Either::Right(b) => b.respond_to(req).map_into_right_body(), - } - } -} +// fn respond_to(self, req: &HttpRequest) -> HttpResponse { +// match self { +// Either::Left(a) => a.respond_to(req).map_body(|_, body| EitherAnyBodyProj::left(body)), +// Either::Right(b) => b +// .respond_to(req) +// .map_body(|_, body| EitherAnyBodyProj::right(body)), +// } +// } +// } /// A composite error resulting from failure to extract an `Either`. /// diff --git a/src/types/form.rs b/src/types/form.rs index 9c09c6b73..eab819e87 100644 --- a/src/types/form.rs +++ b/src/types/form.rs @@ -20,9 +20,8 @@ use serde::{de::DeserializeOwned, Serialize}; #[cfg(feature = "__compress")] use crate::dev::Decompress; use crate::{ - body::EitherBody, error::UrlencodedError, extract::FromRequest, - http::header::CONTENT_LENGTH, web, Error, HttpMessage, HttpRequest, HttpResponse, - Responder, + error::UrlencodedError, extract::FromRequest, http::header::CONTENT_LENGTH, web, Error, + HttpMessage, HttpRequest, HttpResponse, Responder, }; /// URL encoded payload extractor and responder. @@ -181,7 +180,7 @@ impl fmt::Display for Form { /// See [here](#responder) for example of usage as a handler return type. impl Responder for Form { - type Body = EitherBody; + type Body = String; fn respond_to(self, _: &HttpRequest) -> HttpResponse { match serde_urlencoded::to_string(&self.0) { @@ -189,12 +188,12 @@ impl Responder for Form { .content_type(mime::APPLICATION_WWW_FORM_URLENCODED) .message_body(body) { - Ok(res) => res.map_into_left_body(), - Err(err) => HttpResponse::from_error(err).map_into_right_body(), + Ok(res) => res, + Err(err) => HttpResponse::from_error(err).map_into_body(), }, Err(err) => { - HttpResponse::from_error(UrlencodedError::Serialize(err)).map_into_right_body() + HttpResponse::from_error(UrlencodedError::Serialize(err)).map_into_body() } } } diff --git a/src/types/json.rs b/src/types/json.rs index 2b4d220e2..755ba422a 100644 --- a/src/types/json.rs +++ b/src/types/json.rs @@ -19,7 +19,6 @@ use actix_http::Payload; #[cfg(feature = "__compress")] use crate::dev::Decompress; use crate::{ - body::EitherBody, error::{Error, JsonPayloadError}, extract::FromRequest, http::header::CONTENT_LENGTH, @@ -117,7 +116,7 @@ impl Serialize for Json { /// /// If serialization failed impl Responder for Json { - type Body = EitherBody; + type Body = String; fn respond_to(self, _: &HttpRequest) -> HttpResponse { match serde_json::to_string(&self.0) { @@ -125,12 +124,12 @@ impl Responder for Json { .content_type(mime::APPLICATION_JSON) .message_body(body) { - Ok(res) => res.map_into_left_body(), - Err(err) => HttpResponse::from_error(err).map_into_right_body(), + Ok(res) => res, + Err(err) => HttpResponse::from_error(err).map_into_body(), }, Err(err) => { - HttpResponse::from_error(JsonPayloadError::Serialize(err)).map_into_right_body() + HttpResponse::from_error(JsonPayloadError::Serialize(err)).map_into_body() } } }