1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-01 00:44:26 +02:00

add associated error type to MessageBody (#2183)

This commit is contained in:
Rob Ede
2021-05-05 18:36:02 +01:00
committed by GitHub
parent dd1a3e7675
commit ddaf8c3e43
27 changed files with 447 additions and 74 deletions

View File

@ -1,6 +1,7 @@
//! Stream encoders.
use std::{
error::Error as StdError,
future::Future,
io::{self, Write as _},
pin::Pin,
@ -10,12 +11,13 @@ use std::{
use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use derive_more::Display;
use flate2::write::{GzEncoder, ZlibEncoder};
use futures_core::ready;
use pin_project::pin_project;
use crate::{
body::{Body, BodySize, MessageBody, ResponseBody},
body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody},
http::{
header::{ContentEncoding, CONTENT_ENCODING},
HeaderValue, StatusCode,
@ -92,10 +94,16 @@ impl<B: MessageBody> Encoder<B> {
enum EncoderBody<B> {
Bytes(Bytes),
Stream(#[pin] B),
BoxedStream(Pin<Box<dyn MessageBody>>),
BoxedStream(BoxAnyBody),
}
impl<B: MessageBody> MessageBody for EncoderBody<B> {
impl<B> MessageBody for EncoderBody<B>
where
B: MessageBody,
B::Error: Into<Error>,
{
type Error = EncoderError<B::Error>;
fn size(&self) -> BodySize {
match self {
EncoderBody::Bytes(ref b) => b.size(),
@ -107,7 +115,7 @@ impl<B: MessageBody> MessageBody for EncoderBody<B> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
match self.project() {
EncoderBodyProj::Bytes(b) => {
if b.is_empty() {
@ -116,13 +124,32 @@ impl<B: MessageBody> MessageBody for EncoderBody<B> {
Poll::Ready(Some(Ok(std::mem::take(b))))
}
}
EncoderBodyProj::Stream(b) => b.poll_next(cx),
EncoderBodyProj::BoxedStream(ref mut b) => b.as_mut().poll_next(cx),
// TODO: MSRV 1.51: poll_map_err
EncoderBodyProj::Stream(b) => match ready!(b.poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(EncoderError::Body(err)))),
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
},
EncoderBodyProj::BoxedStream(ref mut b) => {
match ready!(b.as_pin_mut().poll_next(cx)) {
Some(Err(err)) => {
Poll::Ready(Some(Err(EncoderError::Boxed(err.into()))))
}
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
}
}
}
}
}
impl<B: MessageBody> MessageBody for Encoder<B> {
impl<B> MessageBody for Encoder<B>
where
B: MessageBody,
B::Error: Into<Error>,
{
type Error = EncoderError<B::Error>;
fn size(&self) -> BodySize {
if self.encoder.is_none() {
self.body.size()
@ -134,7 +161,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
) -> Poll<Option<Result<Bytes, Self::Error>>> {
let mut this = self.project();
loop {
if *this.eof {
@ -142,8 +169,9 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
}
if let Some(ref mut fut) = this.fut {
let mut encoder =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
let mut encoder = ready!(Pin::new(fut).poll(cx))
.map_err(|_| EncoderError::Blocking(BlockingError))?
.map_err(EncoderError::Io)?;
let chunk = encoder.take();
*this.encoder = Some(encoder);
@ -162,7 +190,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
Some(Ok(chunk)) => {
if let Some(mut encoder) = this.encoder.take() {
if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
encoder.write(&chunk)?;
encoder.write(&chunk).map_err(EncoderError::Io)?;
let chunk = encoder.take();
*this.encoder = Some(encoder);
@ -182,7 +210,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
None => {
if let Some(encoder) = this.encoder.take() {
let chunk = encoder.finish()?;
let chunk = encoder.finish().map_err(EncoderError::Io)?;
if chunk.is_empty() {
return Poll::Ready(None);
} else {
@ -281,3 +309,36 @@ impl ContentEncoder {
}
}
}
#[derive(Debug, Display)]
#[non_exhaustive]
pub enum EncoderError<E> {
#[display(fmt = "body")]
Body(E),
#[display(fmt = "boxed")]
Boxed(Error),
#[display(fmt = "blocking")]
Blocking(BlockingError),
#[display(fmt = "io")]
Io(io::Error),
}
impl<E: StdError> StdError for EncoderError<E> {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
None
}
}
impl<E: Into<Error>> From<EncoderError<E>> for Error {
fn from(err: EncoderError<E>) -> Self {
match err {
EncoderError::Body(err) => err.into(),
EncoderError::Boxed(err) => err,
EncoderError::Blocking(err) => err.into(),
EncoderError::Io(err) => err.into(),
}
}
}