1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-01 08:45:10 +02:00

This is a squashed commit:

- Convert MessageBody to accept Pin in poll_next

- add CHANGES and increase versions aligned to semver

- update crates to accomodate MessageBody Pin change

- fix tests and dependencies
This commit is contained in:
Maksym Vorobiov
2020-01-29 11:15:13 +03:00
committed by Yuki Okushi
parent a4148de226
commit 9d04b250f9
24 changed files with 247 additions and 174 deletions

View File

@ -9,6 +9,7 @@ use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder};
use futures_core::ready;
use pin_project::{pin_project, project};
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, CONTENT_ENCODING};
@ -19,8 +20,10 @@ use super::Writer;
const INPLACE: usize = 1024;
#[pin_project]
pub struct Encoder<B> {
eof: bool,
#[pin]
body: EncoderBody<B>,
encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
@ -76,67 +79,83 @@ impl<B: MessageBody> Encoder<B> {
}
}
#[pin_project]
enum EncoderBody<B> {
Bytes(Bytes),
Stream(B),
BoxedStream(Box<dyn MessageBody>),
Stream(#[pin] B),
BoxedStream(#[pin] Box<dyn MessageBody>),
}
impl<B: MessageBody> MessageBody for EncoderBody<B> {
fn size(&self) -> BodySize {
match self {
EncoderBody::Bytes(ref b) => b.size(),
EncoderBody::Stream(ref b) => b.size(),
EncoderBody::BoxedStream(ref b) => b.size(),
}
}
#[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
#[project]
match self.project() {
EncoderBody::Bytes(b) => {
if b.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
}
}
EncoderBody::Stream(b) => b.poll_next(cx),
EncoderBody::BoxedStream(b) => b.poll_next(cx),
}
}
}
impl<B: MessageBody> MessageBody for Encoder<B> {
fn size(&self) -> BodySize {
if self.encoder.is_none() {
match self.body {
EncoderBody::Bytes(ref b) => b.size(),
EncoderBody::Stream(ref b) => b.size(),
EncoderBody::BoxedStream(ref b) => b.size(),
}
self.body.size()
} else {
BodySize::Stream
}
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project();
loop {
if self.eof {
if *this.eof {
return Poll::Ready(None);
}
if let Some(ref mut fut) = self.fut {
if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
};
let chunk = encoder.take();
self.encoder = Some(encoder);
self.fut.take();
*this.encoder = Some(encoder);
this.fut.take();
if !chunk.is_empty() {
return Poll::Ready(Some(Ok(chunk)));
}
}
let result = match self.body {
EncoderBody::Bytes(ref mut b) => {
if b.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(std::mem::replace(b, Bytes::new()))))
}
}
EncoderBody::Stream(ref mut b) => b.poll_next(cx),
EncoderBody::BoxedStream(ref mut b) => b.poll_next(cx),
};
let result = this.body.as_mut().poll_next(cx);
match result {
Poll::Ready(Some(Ok(chunk))) => {
if let Some(mut encoder) = self.encoder.take() {
if let Some(mut encoder) = this.encoder.take() {
if chunk.len() < INPLACE {
encoder.write(&chunk)?;
let chunk = encoder.take();
self.encoder = Some(encoder);
*this.encoder = Some(encoder);
if !chunk.is_empty() {
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
*this.fut = Some(run(move || {
encoder.write(&chunk)?;
Ok(encoder)
}));
@ -146,12 +165,12 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
}
}
Poll::Ready(None) => {
if let Some(encoder) = self.encoder.take() {
if let Some(encoder) = this.encoder.take() {
let chunk = encoder.finish()?;
if chunk.is_empty() {
return Poll::Ready(None);
} else {
self.eof = true;
*this.eof = true;
return Poll::Ready(Some(Ok(chunk)));
}
} else {