2021-02-11 22:39:54 +00:00
|
|
|
//! Stream decoders.
|
|
|
|
|
2021-02-12 00:15:25 +00:00
|
|
|
use std::{
|
|
|
|
future::Future,
|
|
|
|
io::{self, Write as _},
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
2019-03-26 15:14:32 -07:00
|
|
|
|
2021-01-10 00:04:19 +08:00
|
|
|
use actix_rt::task::{spawn_blocking, JoinHandle};
|
2019-03-28 11:08:24 -07:00
|
|
|
use bytes::Bytes;
|
2019-12-13 11:24:57 +06:00
|
|
|
use futures_core::{ready, Stream};
|
2021-06-19 21:21:13 +02:00
|
|
|
|
|
|
|
#[cfg(feature = "compress-brotli")]
|
|
|
|
use brotli2::write::BrotliDecoder;
|
|
|
|
|
|
|
|
#[cfg(feature = "compress-gzip")]
|
|
|
|
use flate2::write::{GzDecoder, ZlibDecoder};
|
|
|
|
|
|
|
|
#[cfg(feature = "compress-zstd")]
|
2021-06-03 22:32:52 +02:00
|
|
|
use zstd::stream::write::Decoder as ZstdDecoder;
|
2019-03-26 15:14:32 -07:00
|
|
|
|
2021-02-12 00:15:25 +00:00
|
|
|
use crate::{
|
|
|
|
encoding::Writer,
|
|
|
|
error::{BlockingError, PayloadError},
|
|
|
|
http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING},
|
|
|
|
};
|
2019-03-26 15:14:32 -07:00
|
|
|
|
2021-02-12 00:15:25 +00:00
|
|
|
const MAX_CHUNK_SIZE_DECODE_IN_PLACE: usize = 2049;
|
2019-03-28 21:15:26 -07:00
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
pub struct Decoder<S> {
|
2019-03-26 15:14:32 -07:00
|
|
|
decoder: Option<ContentDecoder>,
|
2019-03-28 11:08:24 -07:00
|
|
|
stream: S,
|
|
|
|
eof: bool,
|
2021-01-10 00:04:19 +08:00
|
|
|
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
impl<S> Decoder<S>
|
2019-03-26 15:14:32 -07:00
|
|
|
where
|
2019-11-15 15:54:11 +06:00
|
|
|
S: Stream<Item = Result<Bytes, PayloadError>>,
|
2019-03-26 15:14:32 -07:00
|
|
|
{
|
2019-03-28 11:08:24 -07:00
|
|
|
/// Construct a decoder.
|
|
|
|
#[inline]
|
|
|
|
pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
|
2019-03-26 15:14:32 -07:00
|
|
|
let decoder = match encoding {
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-brotli")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
|
2019-12-20 13:50:07 +06:00
|
|
|
BrotliDecoder::new(Writer::new()),
|
2019-03-26 15:14:32 -07:00
|
|
|
))),
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
|
|
|
|
ZlibDecoder::new(Writer::new()),
|
|
|
|
))),
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(
|
|
|
|
GzDecoder::new(Writer::new()),
|
|
|
|
))),
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-zstd")]
|
2021-06-03 22:32:52 +02:00
|
|
|
ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new(
|
|
|
|
ZstdDecoder::new(Writer::new()).expect(
|
|
|
|
"Failed to create zstd decoder. This is a bug. \
|
|
|
|
Please report it to the actix-web repository.",
|
|
|
|
),
|
|
|
|
))),
|
2019-03-26 15:14:32 -07:00
|
|
|
_ => None,
|
|
|
|
};
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
Decoder {
|
|
|
|
decoder,
|
|
|
|
stream,
|
|
|
|
fut: None,
|
|
|
|
eof: false,
|
|
|
|
}
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
/// Construct decoder based on headers.
|
|
|
|
#[inline]
|
|
|
|
pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
|
2019-03-26 15:14:32 -07:00
|
|
|
// check content-encoding
|
2021-02-12 00:15:25 +00:00
|
|
|
let encoding = headers
|
|
|
|
.get(&CONTENT_ENCODING)
|
|
|
|
.and_then(|val| val.to_str().ok())
|
2021-09-01 09:53:26 +01:00
|
|
|
.and_then(|x| x.parse().ok())
|
2021-02-12 00:15:25 +00:00
|
|
|
.unwrap_or(ContentEncoding::Identity);
|
2019-03-26 15:14:32 -07:00
|
|
|
|
|
|
|
Self::new(stream, encoding)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
impl<S> Stream for Decoder<S>
|
2019-03-26 15:14:32 -07:00
|
|
|
where
|
2019-11-15 15:54:11 +06:00
|
|
|
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
|
2019-03-26 15:14:32 -07:00
|
|
|
{
|
2019-11-15 15:54:11 +06:00
|
|
|
type Item = Result<Bytes, PayloadError>;
|
2019-03-26 15:14:32 -07:00
|
|
|
|
2019-11-15 15:54:11 +06:00
|
|
|
fn poll_next(
|
|
|
|
mut self: Pin<&mut Self>,
|
2019-12-08 00:46:51 +06:00
|
|
|
cx: &mut Context<'_>,
|
2019-11-15 15:54:11 +06:00
|
|
|
) -> Poll<Option<Self::Item>> {
|
2019-03-26 15:14:32 -07:00
|
|
|
loop {
|
2019-03-28 11:08:24 -07:00
|
|
|
if let Some(ref mut fut) = self.fut {
|
2021-02-06 08:23:59 -08:00
|
|
|
let (chunk, decoder) =
|
|
|
|
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
self.decoder = Some(decoder);
|
|
|
|
self.fut.take();
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
if let Some(chunk) = chunk {
|
2019-11-15 15:54:11 +06:00
|
|
|
return Poll::Ready(Some(Ok(chunk)));
|
2019-03-28 11:08:24 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if self.eof {
|
2019-11-15 15:54:11 +06:00
|
|
|
return Poll::Ready(None);
|
2019-03-28 11:08:24 -07:00
|
|
|
}
|
|
|
|
|
2021-02-12 00:15:25 +00:00
|
|
|
match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
|
|
|
|
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
|
|
|
|
|
|
|
|
Some(Ok(chunk)) => {
|
2019-03-28 11:08:24 -07:00
|
|
|
if let Some(mut decoder) = self.decoder.take() {
|
2021-02-12 00:15:25 +00:00
|
|
|
if chunk.len() < MAX_CHUNK_SIZE_DECODE_IN_PLACE {
|
2019-03-28 11:08:24 -07:00
|
|
|
let chunk = decoder.feed_data(chunk)?;
|
2019-03-28 21:15:26 -07:00
|
|
|
self.decoder = Some(decoder);
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 21:15:26 -07:00
|
|
|
if let Some(chunk) = chunk {
|
2019-11-15 15:54:11 +06:00
|
|
|
return Poll::Ready(Some(Ok(chunk)));
|
2019-03-28 21:15:26 -07:00
|
|
|
}
|
|
|
|
} else {
|
2021-01-10 00:04:19 +08:00
|
|
|
self.fut = Some(spawn_blocking(move || {
|
2019-03-28 21:15:26 -07:00
|
|
|
let chunk = decoder.feed_data(chunk)?;
|
|
|
|
Ok((chunk, decoder))
|
|
|
|
}));
|
|
|
|
}
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 11:08:24 -07:00
|
|
|
continue;
|
2019-03-26 15:14:32 -07:00
|
|
|
} else {
|
2019-11-15 15:54:11 +06:00
|
|
|
return Poll::Ready(Some(Ok(chunk)));
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
}
|
2021-02-12 00:15:25 +00:00
|
|
|
|
|
|
|
None => {
|
2019-03-28 11:08:24 -07:00
|
|
|
self.eof = true;
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-28 21:15:26 -07:00
|
|
|
return if let Some(mut decoder) = self.decoder.take() {
|
2019-11-15 15:54:11 +06:00
|
|
|
match decoder.feed_eof() {
|
|
|
|
Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
|
|
|
|
Ok(None) => Poll::Ready(None),
|
|
|
|
Err(err) => Poll::Ready(Some(Err(err.into()))),
|
|
|
|
}
|
2019-03-26 15:14:32 -07:00
|
|
|
} else {
|
2019-11-15 15:54:11 +06:00
|
|
|
Poll::Ready(None)
|
2019-03-26 15:14:32 -07:00
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
enum ContentDecoder {
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
Deflate(Box<ZlibDecoder<Writer>>),
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
Gzip(Box<GzDecoder<Writer>>),
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-brotli")]
|
2019-12-20 13:50:07 +06:00
|
|
|
Br(Box<BrotliDecoder<Writer>>),
|
2021-06-03 22:32:52 +02:00
|
|
|
// We need explicit 'static lifetime here because ZstdDecoder need lifetime
|
|
|
|
// argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static`
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-zstd")]
|
2021-06-03 22:32:52 +02:00
|
|
|
Zstd(Box<ZstdDecoder<'static, Writer>>),
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ContentDecoder {
|
|
|
|
fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
|
|
|
|
match self {
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-brotli")]
|
2019-12-07 16:55:41 +01:00
|
|
|
ContentDecoder::Br(ref mut decoder) => match decoder.flush() {
|
|
|
|
Ok(()) => {
|
|
|
|
let b = decoder.get_mut().take();
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-26 15:14:32 -07:00
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() {
|
|
|
|
Ok(_) => {
|
|
|
|
let b = decoder.get_mut().take();
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-26 15:14:32 -07:00
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() {
|
|
|
|
Ok(_) => {
|
|
|
|
let b = decoder.get_mut().take();
|
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-06-03 22:32:52 +02:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-zstd")]
|
2021-06-03 22:32:52 +02:00
|
|
|
ContentDecoder::Zstd(ref mut decoder) => match decoder.flush() {
|
|
|
|
Ok(_) => {
|
|
|
|
let b = decoder.get_mut().take();
|
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
|
|
|
|
match self {
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-brotli")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentDecoder::Br(ref mut decoder) => match decoder.write_all(&data) {
|
|
|
|
Ok(_) => {
|
|
|
|
decoder.flush()?;
|
|
|
|
let b = decoder.get_mut().take();
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-26 15:14:32 -07:00
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) {
|
|
|
|
Ok(_) => {
|
|
|
|
decoder.flush()?;
|
|
|
|
let b = decoder.get_mut().take();
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-26 15:14:32 -07:00
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-gzip")]
|
2019-03-26 15:14:32 -07:00
|
|
|
ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
|
|
|
|
Ok(_) => {
|
|
|
|
decoder.flush()?;
|
2021-02-12 00:15:25 +00:00
|
|
|
|
2019-03-26 15:14:32 -07:00
|
|
|
let b = decoder.get_mut().take();
|
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2021-06-03 22:32:52 +02:00
|
|
|
|
2021-06-19 21:21:13 +02:00
|
|
|
#[cfg(feature = "compress-zstd")]
|
2021-06-03 22:32:52 +02:00
|
|
|
ContentDecoder::Zstd(ref mut decoder) => match decoder.write_all(&data) {
|
|
|
|
Ok(_) => {
|
|
|
|
decoder.flush()?;
|
|
|
|
|
|
|
|
let b = decoder.get_mut().take();
|
|
|
|
if !b.is_empty() {
|
|
|
|
Ok(Some(b))
|
|
|
|
} else {
|
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => Err(e),
|
|
|
|
},
|
2019-03-26 15:14:32 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|