From 34792934168897d0559cadb31720ad27a5114a34 Mon Sep 17 00:00:00 2001 From: Arthur Le Moigne Date: Thu, 3 Jun 2021 22:32:52 +0200 Subject: [PATCH] Add zstd ContentEncoding support (#2244) Co-authored-by: Igor Aleksanov Co-authored-by: Rob Ede --- Cargo.toml | 1 + actix-http/CHANGES.md | 3 + actix-http/Cargo.toml | 3 +- actix-http/src/encoding/decoder.rs | 36 ++++++ actix-http/src/encoding/encoder.rs | 20 +++ .../src/header/shared/content_encoding.rs | 7 + tests/test_server.rs | 120 ++++++++++++++++++ 7 files changed, 189 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5aa302333..6893067d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,6 +101,7 @@ brotli2 = "0.3.2" criterion = "0.3" env_logger = "0.8" flate2 = "1.0.13" +zstd = "0.7" rand = "0.8" rcgen = "0.8" serde_derive = "1.0" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index ec7d8ee3b..fbf9f8e99 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -9,6 +9,7 @@ * Re-export `ContentEncoding` and `ConnectionType` at the crate root. [#2171] * `Response::into_body` that consumes response and returns body type. [#2201] * `impl Default` for `Response`. [#2201] +* Add zstd support for `ContentEncoding`. [#2244] ### Changed * The `MessageBody` trait now has an associated `Error` type. [#2183] @@ -35,6 +36,8 @@ [#2201]: https://github.com/actix/actix-web/pull/2201 [#2205]: https://github.com/actix/actix-web/pull/2205 [#2215]: https://github.com/actix/actix-web/pull/2215 +[#2244]: https://github.com/actix/actix-web/pull/2244 + ## 3.0.0-beta.6 - 2021-04-17 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 1f7df39a6..809be868d 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -32,7 +32,7 @@ openssl = ["actix-tls/openssl"] rustls = ["actix-tls/rustls"] # enable compression support -compress = ["flate2", "brotli2"] +compress = ["flate2", "brotli2", "zstd"] # trust-dns as client dns resolver trust-dns = ["trust-dns-resolver"] @@ -76,6 +76,7 @@ tokio = { version = "1.2", features = ["sync"] } # compression brotli2 = { version="0.3.2", optional = true } flate2 = { version = "1.0.13", optional = true } +zstd = { version = "0.7", optional = true } trust-dns-resolver = { version = "0.20.0", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index f0abae865..58981e82e 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -12,6 +12,7 @@ use brotli2::write::BrotliDecoder; use bytes::Bytes; use flate2::write::{GzDecoder, ZlibDecoder}; use futures_core::{ready, Stream}; +use zstd::stream::write::Decoder as ZstdDecoder; use crate::{ encoding::Writer, @@ -45,6 +46,12 @@ where ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new( GzDecoder::new(Writer::new()), ))), + ContentEncoding::Zstd => Some(ContentDecoder::Zstd(Box::new( + ZstdDecoder::new(Writer::new()).expect( + "Failed to create zstd decoder. This is a bug. \ + Please report it to the actix-web repository.", + ), + ))), _ => None, }; @@ -144,6 +151,9 @@ enum ContentDecoder { Deflate(Box>), Gzip(Box>), Br(Box>), + // We need explicit 'static lifetime here because ZstdDecoder need lifetime + // argument, and we use `spawn_blocking` in `Decoder::poll_next` that require `FnOnce() -> R + Send + 'static` + Zstd(Box>), } impl ContentDecoder { @@ -186,6 +196,18 @@ impl ContentDecoder { } Err(e) => Err(e), }, + + ContentDecoder::Zstd(ref mut decoder) => match decoder.flush() { + Ok(_) => { + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, } } @@ -232,6 +254,20 @@ impl ContentDecoder { } Err(e) => Err(e), }, + + ContentDecoder::Zstd(ref mut decoder) => match decoder.write_all(&data) { + Ok(_) => { + decoder.flush()?; + + let b = decoder.get_mut().take(); + if !b.is_empty() { + Ok(Some(b)) + } else { + Ok(None) + } + } + Err(e) => Err(e), + }, } } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index b8bc8b68d..6adde9be2 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -15,6 +15,7 @@ use derive_more::Display; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; use pin_project::pin_project; +use zstd::stream::write::Encoder as ZstdEncoder; use crate::{ body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody}, @@ -237,6 +238,9 @@ enum ContentEncoder { Deflate(ZlibEncoder), Gzip(GzEncoder), Br(BrotliEncoder), + // We need explicit 'static lifetime here because ZstdEncoder need lifetime + // argument, and we use `spawn_blocking` in `Encoder::poll_next` that require `FnOnce() -> R + Send + 'static` + Zstd(ZstdEncoder<'static, Writer>), } impl ContentEncoder { @@ -253,6 +257,10 @@ impl ContentEncoder { ContentEncoding::Br => { Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3))) } + ContentEncoding::Zstd => { + let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; + Some(ContentEncoder::Zstd(encoder)) + } _ => None, } } @@ -263,6 +271,7 @@ impl ContentEncoder { ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(), ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(), ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(), + ContentEncoder::Zstd(ref mut encoder) => encoder.get_mut().take(), } } @@ -280,6 +289,10 @@ impl ContentEncoder { Ok(writer) => Ok(writer.buf.freeze()), Err(err) => Err(err), }, + ContentEncoder::Zstd(encoder) => match encoder.finish() { + Ok(writer) => Ok(writer.buf.freeze()), + Err(err) => Err(err), + }, } } @@ -306,6 +319,13 @@ impl ContentEncoder { Err(err) } }, + ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Error decoding ztsd encoding: {}", err); + Err(err) + } + }, } } } diff --git a/actix-http/src/header/shared/content_encoding.rs b/actix-http/src/header/shared/content_encoding.rs index b93d66101..b9c1d2795 100644 --- a/actix-http/src/header/shared/content_encoding.rs +++ b/actix-http/src/header/shared/content_encoding.rs @@ -23,6 +23,9 @@ pub enum ContentEncoding { /// Gzip algorithm. Gzip, + // Zstd algorithm. + Zstd, + /// Indicates the identity function (i.e. no compression, nor modification). Identity, } @@ -41,6 +44,7 @@ impl ContentEncoding { ContentEncoding::Br => "br", ContentEncoding::Gzip => "gzip", ContentEncoding::Deflate => "deflate", + ContentEncoding::Zstd => "zstd", ContentEncoding::Identity | ContentEncoding::Auto => "identity", } } @@ -53,6 +57,7 @@ impl ContentEncoding { ContentEncoding::Gzip => 1.0, ContentEncoding::Deflate => 0.9, ContentEncoding::Identity | ContentEncoding::Auto => 0.1, + ContentEncoding::Zstd => 0.0, } } } @@ -81,6 +86,8 @@ impl From<&str> for ContentEncoding { ContentEncoding::Gzip } else if val.eq_ignore_ascii_case("deflate") { ContentEncoding::Deflate + } else if val.eq_ignore_ascii_case("zstd") { + ContentEncoding::Zstd } else { ContentEncoding::default() } diff --git a/tests/test_server.rs b/tests/test_server.rs index c341aa0ce..520eb5ce2 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -29,6 +29,7 @@ use openssl::{ x509::X509, }; use rand::{distributions::Alphanumeric, Rng}; +use zstd::stream::{read::Decoder as ZstdDecoder, write::Encoder as ZstdEncoder}; use actix_web::dev::BodyEncoding; use actix_web::middleware::{Compress, NormalizePath, TrailingSlash}; @@ -476,6 +477,125 @@ async fn test_body_brotli() { assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); } +#[actix_rt::test] +async fn test_body_zstd() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new() + .wrap(Compress::new(ContentEncoding::Zstd)) + .service(web::resource("/").route(web::to(move || HttpResponse::Ok().body(STR)))) + }); + + // client request + let mut response = srv + .get("/") + .append_header((ACCEPT_ENCODING, "zstd")) + .no_decompress() + .send() + .await + .unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + + // decode + let mut e = ZstdDecoder::new(&bytes[..]).unwrap(); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_body_zstd_streaming() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new() + .wrap(Compress::new(ContentEncoding::Zstd)) + .service(web::resource("/").route(web::to(move || { + HttpResponse::Ok() + .streaming(TestBody::new(Bytes::from_static(STR.as_ref()), 24)) + }))) + }); + + // client request + let mut response = srv + .get("/") + .append_header((ACCEPT_ENCODING, "zstd")) + .no_decompress() + .send() + .await + .unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + + // decode + let mut e = ZstdDecoder::new(&bytes[..]).unwrap(); + let mut dec = Vec::new(); + e.read_to_end(&mut dec).unwrap(); + assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_zstd_encoding() { + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new().service( + web::resource("/").route(web::to(move |body: Bytes| HttpResponse::Ok().body(body))), + ) + }); + + let mut e = ZstdEncoder::new(Vec::new(), 5).unwrap(); + e.write_all(STR.as_ref()).unwrap(); + let enc = e.finish().unwrap(); + + // client request + let request = srv + .post("/") + .append_header((CONTENT_ENCODING, "zstd")) + .send_body(enc.clone()); + let mut response = request.await.unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().await.unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); +} + +#[actix_rt::test] +async fn test_zstd_encoding_large() { + let data = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(320_000) + .map(char::from) + .collect::(); + + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new().service( + web::resource("/") + .app_data(web::PayloadConfig::new(320_000)) + .route(web::to(move |body: Bytes| { + HttpResponse::Ok().streaming(TestBody::new(body, 10240)) + })), + ) + }); + + let mut e = ZstdEncoder::new(Vec::new(), 5).unwrap(); + e.write_all(data.as_ref()).unwrap(); + let enc = e.finish().unwrap(); + + // client request + let request = srv + .post("/") + .append_header((CONTENT_ENCODING, "zstd")) + .send_body(enc.clone()); + let mut response = request.await.unwrap(); + assert!(response.status().is_success()); + + // read response + let bytes = response.body().limit(320_000).await.unwrap(); + assert_eq!(bytes, Bytes::from(data)); +} + #[actix_rt::test] async fn test_encoding() { let srv = actix_test::start_with(actix_test::config().h1(), || {