mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-20 16:19:58 +02:00
Compare commits
16 Commits
http-test-
...
http-v1.0.
Author | SHA1 | Date | |
---|---|---|---|
|
1d12ba9d5f | ||
|
8c54054844 | ||
|
1732ae8c79 | ||
|
3b860ebdc7 | ||
|
29ac6463e1 | ||
|
01613f334b | ||
|
30dcaf9da0 | ||
|
b0aa9395da | ||
|
a153374b61 | ||
|
a791aab418 | ||
|
cb705317b8 | ||
|
e8e0f98f96 | ||
|
c878f66d05 | ||
|
fac6dec3c9 | ||
|
232f71b3b5 | ||
|
8881c13e60 |
18
CHANGES.md
18
CHANGES.md
@@ -1,5 +1,23 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.0.0] - 2019-12-xx
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
* Move `BodyEncoding` to `dev` module #1220
|
||||||
|
|
||||||
|
## [2.0.0-alpha.6] - 2019-12-15
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Fixed compilation with default features off
|
||||||
|
|
||||||
|
## [2.0.0-alpha.5] - 2019-12-13
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Add test server, `test::start()` and `test::start_with()`
|
||||||
|
|
||||||
## [2.0.0-alpha.4] - 2019-12-08
|
## [2.0.0-alpha.4] - 2019-12-08
|
||||||
|
|
||||||
### Deleted
|
### Deleted
|
||||||
|
15
Cargo.toml
15
Cargo.toml
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web"
|
name = "actix-web"
|
||||||
version = "2.0.0-alpha.5"
|
version = "2.0.0-alpha.6"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -12,11 +12,10 @@ categories = ["network-programming", "asynchronous",
|
|||||||
"web-programming::http-server",
|
"web-programming::http-server",
|
||||||
"web-programming::websocket"]
|
"web-programming::websocket"]
|
||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
features = ["openssl", "compress", "secure-cookies", "client"]
|
features = ["openssl", "rustls", "compress", "secure-cookies"]
|
||||||
|
|
||||||
[badges]
|
[badges]
|
||||||
travis-ci = { repository = "actix/actix-web", branch = "master" }
|
travis-ci = { repository = "actix/actix-web", branch = "master" }
|
||||||
@@ -71,11 +70,11 @@ actix-macros = "0.1.0"
|
|||||||
actix-threadpool = "0.3.0"
|
actix-threadpool = "0.3.0"
|
||||||
actix-tls = "1.0.0"
|
actix-tls = "1.0.0"
|
||||||
|
|
||||||
actix-web-codegen = "0.2.0-alpha.2"
|
actix-web-codegen = "0.2.0"
|
||||||
actix-http = "1.0.0"
|
actix-http = "1.0.0"
|
||||||
awc = { version = "1.0.0", default-features = false }
|
awc = { version = "1.0.1", default-features = false }
|
||||||
|
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
encoding_rs = "0.8"
|
encoding_rs = "0.8"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
@@ -94,11 +93,11 @@ open-ssl = { version="0.10", package = "openssl", optional = true }
|
|||||||
rust-tls = { version = "0.16.0", package = "rustls", optional = true }
|
rust-tls = { version = "0.16.0", package = "rustls", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
# actix = "0.8.3"
|
actix = "0.9.0-alpha.2"
|
||||||
rand = "0.7"
|
rand = "0.7"
|
||||||
env_logger = "0.6"
|
env_logger = "0.6"
|
||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
brotli = "3.3.0"
|
brotli2 = "0.3.2"
|
||||||
flate2 = "1.0.13"
|
flate2 = "1.0.13"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-cors"
|
name = "actix-cors"
|
||||||
version = "0.2.0-alpha.3"
|
version = "0.2.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Cross-origin resource sharing (CORS) for Actix applications."
|
description = "Cross-origin resource sharing (CORS) for Actix applications."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -17,7 +17,7 @@ name = "actix_cors"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = "2.0.0-alpha.3"
|
actix-web = "2.0.0-alpha.5"
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
|
@@ -1,6 +1,10 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## [0.2.0-alpha.7] - 2019-12-07
|
## [0.2.0] - 2019-12-xx
|
||||||
|
|
||||||
|
* Fix BodyEncoding trait import #1220
|
||||||
|
|
||||||
|
## [0.2.0-alpha.1] - 2019-12-07
|
||||||
|
|
||||||
* Migrate to `std::future`
|
* Migrate to `std::future`
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-files"
|
name = "actix-files"
|
||||||
version = "0.2.0-alpha.3"
|
version = "0.2.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Static files support for actix web."
|
description = "Static files support for actix web."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -18,11 +18,11 @@ name = "actix_files"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "2.0.0-alpha.3", default-features = false }
|
actix-web = { version = "2.0.0-alpha.6", default-features = false }
|
||||||
actix-http = "1.0.0-alpha.3"
|
actix-http = "1.0.0"
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
bitflags = "1"
|
bitflags = "1"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
@@ -33,4 +33,4 @@ v_htmlescape = "0.4"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-web = { version = "2.0.0-alpha.3", features=["openssl"] }
|
actix-web = { version = "2.0.0-alpha.6", features=["openssl"] }
|
||||||
|
@@ -12,11 +12,11 @@ use mime;
|
|||||||
use mime_guess::from_path;
|
use mime_guess::from_path;
|
||||||
|
|
||||||
use actix_http::body::SizedStream;
|
use actix_http::body::SizedStream;
|
||||||
|
use actix_web::dev::BodyEncoding;
|
||||||
use actix_web::http::header::{
|
use actix_web::http::header::{
|
||||||
self, Charset, ContentDisposition, DispositionParam, DispositionType, ExtendedValue,
|
self, Charset, ContentDisposition, DispositionParam, DispositionType, ExtendedValue,
|
||||||
};
|
};
|
||||||
use actix_web::http::{ContentEncoding, StatusCode};
|
use actix_web::http::{ContentEncoding, StatusCode};
|
||||||
use actix_web::middleware::BodyEncoding;
|
|
||||||
use actix_web::{Error, HttpMessage, HttpRequest, HttpResponse, Responder};
|
use actix_web::{Error, HttpMessage, HttpRequest, HttpResponse, Responder};
|
||||||
use futures::future::{ready, Ready};
|
use futures::future::{ready, Ready};
|
||||||
|
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-framed"
|
name = "actix-framed"
|
||||||
version = "0.3.0-alpha.1"
|
version = "0.3.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix framed app server"
|
description = "Actix framed app server"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -24,9 +24,9 @@ actix-codec = "0.2.0"
|
|||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
actix-router = "0.2.0"
|
actix-router = "0.2.0"
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-http = "1.0.0-alpha.3"
|
actix-http = "1.0.0"
|
||||||
|
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
pin-project = "0.4.6"
|
pin-project = "0.4.6"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
@@ -34,5 +34,5 @@ log = "0.4"
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
actix-connect = { version = "1.0.0", features=["openssl"] }
|
actix-connect = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-http-test = { version = "1.0.0-alpha.3", features=["openssl"] }
|
actix-http-test = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-utils = "1.0.0"
|
actix-utils = "1.0.3"
|
||||||
|
@@ -1,5 +1,13 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [1.0.1] - 2019-12-20
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
* Poll upgrade service's readiness from HTTP service handlers
|
||||||
|
|
||||||
|
* Replace brotli with brotli2 #1224
|
||||||
|
|
||||||
## [1.0.0] - 2019-12-13
|
## [1.0.0] - 2019-12-13
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-http"
|
name = "actix-http"
|
||||||
version = "1.0.0"
|
version = "1.0.1"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix http primitives"
|
description = "Actix http primitives"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -15,7 +15,7 @@ license = "MIT/Apache-2.0"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
features = ["openssl", "rustls", "fail", "compress", "secure-cookies"]
|
features = ["openssl", "rustls", "failure", "compress", "secure-cookies"]
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "actix_http"
|
name = "actix_http"
|
||||||
@@ -31,7 +31,7 @@ openssl = ["actix-tls/openssl", "actix-connect/openssl"]
|
|||||||
rustls = ["actix-tls/rustls", "actix-connect/rustls"]
|
rustls = ["actix-tls/rustls", "actix-connect/rustls"]
|
||||||
|
|
||||||
# enable compressison support
|
# enable compressison support
|
||||||
compress = ["flate2", "brotli"]
|
compress = ["flate2", "brotli2"]
|
||||||
|
|
||||||
# failure integration. actix does not use failure anymore
|
# failure integration. actix does not use failure anymore
|
||||||
failure = ["fail-ure"]
|
failure = ["fail-ure"]
|
||||||
@@ -42,7 +42,7 @@ secure-cookies = ["ring"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
actix-codec = "0.2.0"
|
actix-codec = "0.2.0"
|
||||||
actix-connect = "1.0.0"
|
actix-connect = "1.0.1"
|
||||||
actix-utils = "1.0.3"
|
actix-utils = "1.0.3"
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-threadpool = "0.3.1"
|
actix-threadpool = "0.3.1"
|
||||||
@@ -83,7 +83,7 @@ time = "0.1.42"
|
|||||||
ring = { version = "0.16.9", optional = true }
|
ring = { version = "0.16.9", optional = true }
|
||||||
|
|
||||||
# compression
|
# compression
|
||||||
brotli = { version = "3.3.0", optional = true }
|
brotli2 = { version="0.3.2", optional = true }
|
||||||
flate2 = { version = "1.0.13", optional = true }
|
flate2 = { version = "1.0.13", optional = true }
|
||||||
|
|
||||||
# optional deps
|
# optional deps
|
||||||
@@ -92,7 +92,7 @@ fail-ure = { version = "0.1.5", package="failure", optional = true }
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
actix-connect = { version = "1.0.0", features=["openssl"] }
|
actix-connect = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-http-test = { version = "1.0.0-alpha.3", features=["openssl"] }
|
actix-http-test = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-tls = { version = "1.0.0", features=["openssl"] }
|
actix-tls = { version = "1.0.0", features=["openssl"] }
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
env_logger = "0.6"
|
env_logger = "0.6"
|
||||||
|
@@ -51,11 +51,7 @@ impl From<Utf8Error> for ParseError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error for ParseError {
|
impl Error for ParseError {}
|
||||||
fn description(&self) -> &str {
|
|
||||||
self.as_str()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn indexes_of(needle: &str, haystack: &str) -> Option<(usize, usize)> {
|
fn indexes_of(needle: &str, haystack: &str) -> Option<(usize, usize)> {
|
||||||
let haystack_start = haystack.as_ptr() as usize;
|
let haystack_start = haystack.as_ptr() as usize;
|
||||||
|
@@ -4,7 +4,7 @@ use std::pin::Pin;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_threadpool::{run, CpuFuture};
|
use actix_threadpool::{run, CpuFuture};
|
||||||
use brotli::DecompressorWriter;
|
use brotli2::write::BrotliDecoder;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use flate2::write::{GzDecoder, ZlibDecoder};
|
use flate2::write::{GzDecoder, ZlibDecoder};
|
||||||
use futures_core::{ready, Stream};
|
use futures_core::{ready, Stream};
|
||||||
@@ -31,7 +31,7 @@ where
|
|||||||
pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
|
pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
|
||||||
let decoder = match encoding {
|
let decoder = match encoding {
|
||||||
ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
|
ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
|
||||||
DecompressorWriter::new(Writer::new(), 0),
|
BrotliDecoder::new(Writer::new()),
|
||||||
))),
|
))),
|
||||||
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
|
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
|
||||||
ZlibDecoder::new(Writer::new()),
|
ZlibDecoder::new(Writer::new()),
|
||||||
@@ -137,7 +137,7 @@ where
|
|||||||
enum ContentDecoder {
|
enum ContentDecoder {
|
||||||
Deflate(Box<ZlibDecoder<Writer>>),
|
Deflate(Box<ZlibDecoder<Writer>>),
|
||||||
Gzip(Box<GzDecoder<Writer>>),
|
Gzip(Box<GzDecoder<Writer>>),
|
||||||
Br(Box<DecompressorWriter<Writer>>),
|
Br(Box<BrotliDecoder<Writer>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ContentDecoder {
|
impl ContentDecoder {
|
||||||
|
@@ -5,7 +5,7 @@ use std::pin::Pin;
|
|||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_threadpool::{run, CpuFuture};
|
use actix_threadpool::{run, CpuFuture};
|
||||||
use brotli::CompressorWriter;
|
use brotli2::write::BrotliEncoder;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use flate2::write::{GzEncoder, ZlibEncoder};
|
use flate2::write::{GzEncoder, ZlibEncoder};
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
@@ -17,7 +17,7 @@ use crate::{Error, ResponseHead};
|
|||||||
|
|
||||||
use super::Writer;
|
use super::Writer;
|
||||||
|
|
||||||
const INPLACE: usize = 2049;
|
const INPLACE: usize = 1024;
|
||||||
|
|
||||||
pub struct Encoder<B> {
|
pub struct Encoder<B> {
|
||||||
eof: bool,
|
eof: bool,
|
||||||
@@ -174,7 +174,7 @@ fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
|
|||||||
enum ContentEncoder {
|
enum ContentEncoder {
|
||||||
Deflate(ZlibEncoder<Writer>),
|
Deflate(ZlibEncoder<Writer>),
|
||||||
Gzip(GzEncoder<Writer>),
|
Gzip(GzEncoder<Writer>),
|
||||||
Br(Box<CompressorWriter<Writer>>),
|
Br(BrotliEncoder<Writer>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ContentEncoder {
|
impl ContentEncoder {
|
||||||
@@ -188,9 +188,9 @@ impl ContentEncoder {
|
|||||||
Writer::new(),
|
Writer::new(),
|
||||||
flate2::Compression::fast(),
|
flate2::Compression::fast(),
|
||||||
))),
|
))),
|
||||||
ContentEncoding::Br => Some(ContentEncoder::Br(Box::new(
|
ContentEncoding::Br => {
|
||||||
CompressorWriter::new(Writer::new(), 0, 3, 0),
|
Some(ContentEncoder::Br(BrotliEncoder::new(Writer::new(), 3)))
|
||||||
))),
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -198,12 +198,7 @@ impl ContentEncoder {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub(crate) fn take(&mut self) -> Bytes {
|
pub(crate) fn take(&mut self) -> Bytes {
|
||||||
match *self {
|
match *self {
|
||||||
ContentEncoder::Br(ref mut encoder) => {
|
ContentEncoder::Br(ref mut encoder) => encoder.get_mut().take(),
|
||||||
let mut encoder_new =
|
|
||||||
Box::new(CompressorWriter::new(Writer::new(), 0, 3, 0));
|
|
||||||
std::mem::swap(encoder, &mut encoder_new);
|
|
||||||
encoder_new.into_inner().freeze()
|
|
||||||
}
|
|
||||||
ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
|
ContentEncoder::Deflate(ref mut encoder) => encoder.get_mut().take(),
|
||||||
ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
|
ContentEncoder::Gzip(ref mut encoder) => encoder.get_mut().take(),
|
||||||
}
|
}
|
||||||
@@ -211,7 +206,10 @@ impl ContentEncoder {
|
|||||||
|
|
||||||
fn finish(self) -> Result<Bytes, io::Error> {
|
fn finish(self) -> Result<Bytes, io::Error> {
|
||||||
match self {
|
match self {
|
||||||
ContentEncoder::Br(encoder) => Ok(encoder.into_inner().buf.freeze()),
|
ContentEncoder::Br(encoder) => match encoder.finish() {
|
||||||
|
Ok(writer) => Ok(writer.buf.freeze()),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
},
|
||||||
ContentEncoder::Gzip(encoder) => match encoder.finish() {
|
ContentEncoder::Gzip(encoder) => match encoder.finish() {
|
||||||
Ok(writer) => Ok(writer.buf.freeze()),
|
Ok(writer) => Ok(writer.buf.freeze()),
|
||||||
Err(err) => Err(err),
|
Err(err) => Err(err),
|
||||||
|
@@ -19,12 +19,10 @@ impl Writer {
|
|||||||
buf: BytesMut::with_capacity(8192),
|
buf: BytesMut::with_capacity(8192),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn take(&mut self) -> Bytes {
|
fn take(&mut self) -> Bytes {
|
||||||
self.buf.split().freeze()
|
self.buf.split().freeze()
|
||||||
}
|
}
|
||||||
fn freeze(self) -> Bytes {
|
|
||||||
self.buf.freeze()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl io::Write for Writer {
|
impl io::Write for Writer {
|
||||||
@@ -32,6 +30,7 @@ impl io::Write for Writer {
|
|||||||
self.buf.extend_from_slice(buf);
|
self.buf.extend_from_slice(buf);
|
||||||
Ok(buf.len())
|
Ok(buf.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@@ -6,7 +6,9 @@ use std::str::Utf8Error;
|
|||||||
use std::string::FromUtf8Error;
|
use std::string::FromUtf8Error;
|
||||||
use std::{fmt, io, result};
|
use std::{fmt, io, result};
|
||||||
|
|
||||||
|
use actix_codec::{Decoder, Encoder};
|
||||||
pub use actix_threadpool::BlockingError;
|
pub use actix_threadpool::BlockingError;
|
||||||
|
use actix_utils::framed::DispatcherError as FramedDispatcherError;
|
||||||
use actix_utils::timeout::TimeoutError;
|
use actix_utils::timeout::TimeoutError;
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
use derive_more::{Display, From};
|
use derive_more::{Display, From};
|
||||||
@@ -114,10 +116,6 @@ impl fmt::Debug for Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for Error {
|
impl std::error::Error for Error {
|
||||||
fn description(&self) -> &str {
|
|
||||||
"actix-http::Error"
|
|
||||||
}
|
|
||||||
|
|
||||||
fn cause(&self) -> Option<&dyn std::error::Error> {
|
fn cause(&self) -> Option<&dyn std::error::Error> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -467,6 +465,14 @@ impl ResponseError for ContentTypeError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<E, U: Encoder + Decoder> ResponseError for FramedDispatcherError<E, U>
|
||||||
|
where
|
||||||
|
E: fmt::Debug + fmt::Display,
|
||||||
|
<U as Encoder>::Error: fmt::Debug,
|
||||||
|
<U as Decoder>::Error: fmt::Debug,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
/// Helper type that can wrap any error and generate custom response.
|
/// Helper type that can wrap any error and generate custom response.
|
||||||
///
|
///
|
||||||
/// In following example any `io::Error` will be converted into "BAD REQUEST"
|
/// In following example any `io::Error` will be converted into "BAD REQUEST"
|
||||||
@@ -966,7 +972,6 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use http::{Error as HttpError, StatusCode};
|
use http::{Error as HttpError, StatusCode};
|
||||||
use httparse;
|
use httparse;
|
||||||
use std::error::Error as StdError;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -995,7 +1000,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_error_cause() {
|
fn test_error_cause() {
|
||||||
let orig = io::Error::new(io::ErrorKind::Other, "other");
|
let orig = io::Error::new(io::ErrorKind::Other, "other");
|
||||||
let desc = orig.description().to_owned();
|
let desc = orig.to_string();
|
||||||
let e = Error::from(orig);
|
let e = Error::from(orig);
|
||||||
assert_eq!(format!("{}", e.as_response_error()), desc);
|
assert_eq!(format!("{}", e.as_response_error()), desc);
|
||||||
}
|
}
|
||||||
@@ -1003,7 +1008,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_error_display() {
|
fn test_error_display() {
|
||||||
let orig = io::Error::new(io::ErrorKind::Other, "other");
|
let orig = io::Error::new(io::ErrorKind::Other, "other");
|
||||||
let desc = orig.description().to_owned();
|
let desc = orig.to_string();
|
||||||
let e = Error::from(orig);
|
let e = Error::from(orig);
|
||||||
assert_eq!(format!("{}", e), desc);
|
assert_eq!(format!("{}", e), desc);
|
||||||
}
|
}
|
||||||
@@ -1045,7 +1050,7 @@ mod tests {
|
|||||||
match ParseError::from($from) {
|
match ParseError::from($from) {
|
||||||
e @ $error => {
|
e @ $error => {
|
||||||
let desc = format!("{}", e);
|
let desc = format!("{}", e);
|
||||||
assert_eq!(desc, format!("IO error: {}", $from.description()));
|
assert_eq!(desc, format!("IO error: {}", $from));
|
||||||
}
|
}
|
||||||
_ => unreachable!("{:?}", $from),
|
_ => unreachable!("{:?}", $from),
|
||||||
}
|
}
|
||||||
|
@@ -5,7 +5,7 @@ use std::mem::MaybeUninit;
|
|||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
|
||||||
use actix_codec::Decoder;
|
use actix_codec::Decoder;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use http::header::{HeaderName, HeaderValue};
|
use http::header::{HeaderName, HeaderValue};
|
||||||
use http::{header, Method, StatusCode, Uri, Version};
|
use http::{header, Method, StatusCode, Uri, Version};
|
||||||
use httparse;
|
use httparse;
|
||||||
@@ -477,7 +477,7 @@ macro_rules! byte (
|
|||||||
($rdr:ident) => ({
|
($rdr:ident) => ({
|
||||||
if $rdr.len() > 0 {
|
if $rdr.len() > 0 {
|
||||||
let b = $rdr[0];
|
let b = $rdr[0];
|
||||||
let _ = $rdr.split_to(1);
|
$rdr.advance(1);
|
||||||
b
|
b
|
||||||
} else {
|
} else {
|
||||||
return Poll::Pending
|
return Poll::Pending
|
||||||
|
@@ -8,7 +8,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts};
|
|||||||
use actix_rt::time::{delay_until, Delay, Instant};
|
use actix_rt::time::{delay_until, Delay, Instant};
|
||||||
use actix_service::Service;
|
use actix_service::Service;
|
||||||
use bitflags::bitflags;
|
use bitflags::bitflags;
|
||||||
use bytes::BytesMut;
|
use bytes::{Buf, BytesMut};
|
||||||
use log::{error, trace};
|
use log::{error, trace};
|
||||||
|
|
||||||
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
|
use crate::body::{Body, BodySize, MessageBody, ResponseBody};
|
||||||
@@ -66,7 +66,6 @@ where
|
|||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display,
|
||||||
{
|
{
|
||||||
Normal(InnerDispatcher<T, S, B, X, U>),
|
Normal(InnerDispatcher<T, S, B, X, U>),
|
||||||
UpgradeReadiness(InnerDispatcher<T, S, B, X, U>, Request),
|
|
||||||
Upgrade(U::Future),
|
Upgrade(U::Future),
|
||||||
None,
|
None,
|
||||||
}
|
}
|
||||||
@@ -313,7 +312,7 @@ where
|
|||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
if written > 0 {
|
if written > 0 {
|
||||||
let _ = self.write_buf.split_to(written);
|
self.write_buf.advance(written);
|
||||||
}
|
}
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
@@ -323,7 +322,7 @@ where
|
|||||||
if written == self.write_buf.len() {
|
if written == self.write_buf.len() {
|
||||||
unsafe { self.write_buf.set_len(0) }
|
unsafe { self.write_buf.set_len(0) }
|
||||||
} else {
|
} else {
|
||||||
let _ = self.write_buf.split_to(written);
|
self.write_buf.advance(written);
|
||||||
}
|
}
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
@@ -764,8 +763,16 @@ where
|
|||||||
if let DispatcherState::Normal(inner) =
|
if let DispatcherState::Normal(inner) =
|
||||||
std::mem::replace(&mut self.inner, DispatcherState::None)
|
std::mem::replace(&mut self.inner, DispatcherState::None)
|
||||||
{
|
{
|
||||||
self.inner =
|
let mut parts = FramedParts::with_read_buf(
|
||||||
DispatcherState::UpgradeReadiness(inner, req);
|
inner.io,
|
||||||
|
inner.codec,
|
||||||
|
inner.read_buf,
|
||||||
|
);
|
||||||
|
parts.write_buf = inner.write_buf;
|
||||||
|
let framed = Framed::from_parts(parts);
|
||||||
|
self.inner = DispatcherState::Upgrade(
|
||||||
|
inner.upgrade.unwrap().call((req, framed)),
|
||||||
|
);
|
||||||
return self.poll(cx);
|
return self.poll(cx);
|
||||||
} else {
|
} else {
|
||||||
panic!()
|
panic!()
|
||||||
@@ -815,35 +822,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DispatcherState::UpgradeReadiness(ref mut inner, _) => {
|
|
||||||
let upgrade = inner.upgrade.as_mut().unwrap();
|
|
||||||
match upgrade.poll_ready(cx) {
|
|
||||||
Poll::Ready(Ok(_)) => {
|
|
||||||
if let DispatcherState::UpgradeReadiness(inner, req) =
|
|
||||||
std::mem::replace(&mut self.inner, DispatcherState::None)
|
|
||||||
{
|
|
||||||
let mut parts = FramedParts::with_read_buf(
|
|
||||||
inner.io,
|
|
||||||
inner.codec,
|
|
||||||
inner.read_buf,
|
|
||||||
);
|
|
||||||
parts.write_buf = inner.write_buf;
|
|
||||||
let framed = Framed::from_parts(parts);
|
|
||||||
self.inner = DispatcherState::Upgrade(
|
|
||||||
inner.upgrade.unwrap().call((req, framed)),
|
|
||||||
);
|
|
||||||
self.poll(cx)
|
|
||||||
} else {
|
|
||||||
panic!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
Poll::Ready(Err(e)) => {
|
|
||||||
error!("Upgrade handler readiness check error: {}", e);
|
|
||||||
Poll::Ready(Err(DispatchError::Upgrade))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DispatcherState::Upgrade(ref mut fut) => {
|
DispatcherState::Upgrade(ref mut fut) => {
|
||||||
unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| {
|
unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| {
|
||||||
error!("Upgrade handler error: {}", e);
|
error!("Upgrade handler error: {}", e);
|
||||||
|
@@ -72,7 +72,7 @@ where
|
|||||||
Request = (Request, Framed<TcpStream, Codec>),
|
Request = (Request, Framed<TcpStream, Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
{
|
{
|
||||||
/// Create simple tcp stream service
|
/// Create simple tcp stream service
|
||||||
@@ -115,7 +115,7 @@ mod openssl {
|
|||||||
Request = (Request, Framed<SslStream<TcpStream>, Codec>),
|
Request = (Request, Framed<SslStream<TcpStream>, Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
{
|
{
|
||||||
/// Create openssl based service
|
/// Create openssl based service
|
||||||
@@ -165,7 +165,7 @@ mod rustls {
|
|||||||
Request = (Request, Framed<TlsStream<TcpStream>, Codec>),
|
Request = (Request, Framed<TlsStream<TcpStream>, Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
{
|
{
|
||||||
/// Create rustls based service
|
/// Create rustls based service
|
||||||
@@ -255,7 +255,7 @@ where
|
|||||||
X::Error: Into<Error>,
|
X::Error: Into<Error>,
|
||||||
X::InitError: fmt::Debug,
|
X::InitError: fmt::Debug,
|
||||||
U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
|
U: ServiceFactory<Config = (), Request = (Request, Framed<T, Codec>), Response = ()>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
{
|
{
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@@ -412,7 +412,7 @@ where
|
|||||||
X: Service<Request = Request, Response = Request>,
|
X: Service<Request = Request, Response = Request>,
|
||||||
X::Error: Into<Error>,
|
X::Error: Into<Error>,
|
||||||
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
|
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
{
|
{
|
||||||
type Request = (T, Option<net::SocketAddr>);
|
type Request = (T, Option<net::SocketAddr>);
|
||||||
type Response = ();
|
type Response = ();
|
||||||
@@ -441,6 +441,19 @@ where
|
|||||||
.is_ready()
|
.is_ready()
|
||||||
&& ready;
|
&& ready;
|
||||||
|
|
||||||
|
let ready = if let Some(ref mut upg) = self.upgrade {
|
||||||
|
upg.poll_ready(cx)
|
||||||
|
.map_err(|e| {
|
||||||
|
let e = e.into();
|
||||||
|
log::error!("Http service readiness error: {:?}", e);
|
||||||
|
DispatchError::Service(e)
|
||||||
|
})?
|
||||||
|
.is_ready()
|
||||||
|
&& ready
|
||||||
|
} else {
|
||||||
|
ready
|
||||||
|
};
|
||||||
|
|
||||||
if ready {
|
if ready {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
} else {
|
} else {
|
||||||
|
@@ -169,7 +169,7 @@ where
|
|||||||
Request = (Request, Framed<TcpStream, h1::Codec>),
|
Request = (Request, Framed<TcpStream, h1::Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
<U::Service as Service>::Future: 'static,
|
<U::Service as Service>::Future: 'static,
|
||||||
{
|
{
|
||||||
@@ -214,7 +214,7 @@ mod openssl {
|
|||||||
Request = (Request, Framed<SslStream<TcpStream>, h1::Codec>),
|
Request = (Request, Framed<SslStream<TcpStream>, h1::Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
<U::Service as Service>::Future: 'static,
|
<U::Service as Service>::Future: 'static,
|
||||||
{
|
{
|
||||||
@@ -276,7 +276,7 @@ mod rustls {
|
|||||||
Request = (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
|
Request = (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
<U::Service as Service>::Future: 'static,
|
<U::Service as Service>::Future: 'static,
|
||||||
{
|
{
|
||||||
@@ -335,7 +335,7 @@ where
|
|||||||
Request = (Request, Framed<T, h1::Codec>),
|
Request = (Request, Framed<T, h1::Codec>),
|
||||||
Response = (),
|
Response = (),
|
||||||
>,
|
>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
U::InitError: fmt::Debug,
|
U::InitError: fmt::Debug,
|
||||||
<U::Service as Service>::Future: 'static,
|
<U::Service as Service>::Future: 'static,
|
||||||
{
|
{
|
||||||
@@ -493,7 +493,7 @@ where
|
|||||||
X: Service<Request = Request, Response = Request>,
|
X: Service<Request = Request, Response = Request>,
|
||||||
X::Error: Into<Error>,
|
X::Error: Into<Error>,
|
||||||
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
|
U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
|
||||||
U::Error: fmt::Display,
|
U::Error: fmt::Display + Into<Error>,
|
||||||
{
|
{
|
||||||
type Request = (T, Protocol, Option<net::SocketAddr>);
|
type Request = (T, Protocol, Option<net::SocketAddr>);
|
||||||
type Response = ();
|
type Response = ();
|
||||||
@@ -522,6 +522,19 @@ where
|
|||||||
.is_ready()
|
.is_ready()
|
||||||
&& ready;
|
&& ready;
|
||||||
|
|
||||||
|
let ready = if let Some(ref mut upg) = self.upgrade {
|
||||||
|
upg.poll_ready(cx)
|
||||||
|
.map_err(|e| {
|
||||||
|
let e = e.into();
|
||||||
|
log::error!("Http service readiness error: {:?}", e);
|
||||||
|
DispatchError::Service(e)
|
||||||
|
})?
|
||||||
|
.is_ready()
|
||||||
|
&& ready
|
||||||
|
} else {
|
||||||
|
ready
|
||||||
|
};
|
||||||
|
|
||||||
if ready {
|
if ready {
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
} else {
|
} else {
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{Buf, BufMut, BytesMut};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use rand;
|
use rand;
|
||||||
|
|
||||||
@@ -108,7 +108,7 @@ impl Parser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// remove prefix
|
// remove prefix
|
||||||
let _ = src.split_to(idx);
|
src.advance(idx);
|
||||||
|
|
||||||
// no need for body
|
// no need for body
|
||||||
if length == 0 {
|
if length == 0 {
|
||||||
|
@@ -1,24 +1,70 @@
|
|||||||
|
use std::cell::Cell;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
use actix_codec::{AsyncRead, AsyncWrite, Framed};
|
||||||
use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
|
use actix_http::{body, h1, ws, Error, HttpService, Request, Response};
|
||||||
use actix_http_test::test_server;
|
use actix_http_test::test_server;
|
||||||
|
use actix_service::{fn_factory, Service};
|
||||||
use actix_utils::framed::Dispatcher;
|
use actix_utils::framed::Dispatcher;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::task::{Context, Poll};
|
||||||
|
use futures::{Future, SinkExt, StreamExt};
|
||||||
|
|
||||||
async fn ws_service<T: AsyncRead + AsyncWrite + Unpin>(
|
struct WsService<T>(Arc<Mutex<(PhantomData<T>, Cell<bool>)>>);
|
||||||
(req, mut framed): (Request, Framed<T, h1::Codec>),
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
let res = ws::handshake(req.head()).unwrap().message_body(());
|
|
||||||
|
|
||||||
framed
|
impl<T> WsService<T> {
|
||||||
.send((res, body::BodySize::None).into())
|
fn new() -> Self {
|
||||||
.await
|
WsService(Arc::new(Mutex::new((PhantomData, Cell::new(false)))))
|
||||||
.unwrap();
|
}
|
||||||
|
|
||||||
Dispatcher::new(framed.into_framed(ws::Codec::new()), service)
|
fn set_polled(&mut self) {
|
||||||
.await
|
*self.0.lock().unwrap().1.get_mut() = true;
|
||||||
.map_err(|_| panic!())
|
}
|
||||||
|
|
||||||
|
fn was_polled(&self) -> bool {
|
||||||
|
self.0.lock().unwrap().1.get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Clone for WsService<T> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
WsService(self.0.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Service for WsService<T>
|
||||||
|
where
|
||||||
|
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
|
{
|
||||||
|
type Request = (Request, Framed<T, h1::Codec>);
|
||||||
|
type Response = ();
|
||||||
|
type Error = Error;
|
||||||
|
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.set_polled();
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&mut self, (req, mut framed): Self::Request) -> Self::Future {
|
||||||
|
let fut = async move {
|
||||||
|
let res = ws::handshake(req.head()).unwrap().message_body(());
|
||||||
|
|
||||||
|
framed
|
||||||
|
.send((res, body::BodySize::None).into())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Dispatcher::new(framed.into_framed(ws::Codec::new()), service)
|
||||||
|
.await
|
||||||
|
.map_err(|_| panic!())
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::pin(fut)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
|
async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
|
||||||
@@ -37,11 +83,16 @@ async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_simple() {
|
async fn test_simple() {
|
||||||
let mut srv = test_server(|| {
|
let ws_service = WsService::new();
|
||||||
HttpService::build()
|
let mut srv = test_server({
|
||||||
.upgrade(actix_service::fn_service(ws_service))
|
let ws_service = ws_service.clone();
|
||||||
.finish(|_| future::ok::<_, ()>(Response::NotFound()))
|
move || {
|
||||||
.tcp()
|
let ws_service = ws_service.clone();
|
||||||
|
HttpService::build()
|
||||||
|
.upgrade(fn_factory(move || future::ok::<_, ()>(ws_service.clone())))
|
||||||
|
.finish(|_| future::ok::<_, ()>(Response::NotFound()))
|
||||||
|
.tcp()
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// client service
|
// client service
|
||||||
@@ -138,4 +189,6 @@ async fn test_simple() {
|
|||||||
item.unwrap().unwrap(),
|
item.unwrap().unwrap(),
|
||||||
ws::Frame::Close(Some(ws::CloseCode::Normal.into()))
|
ws::Frame::Close(Some(ws::CloseCode::Normal.into()))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
assert!(ws_service.was_polled());
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-identity"
|
name = "actix-identity"
|
||||||
version = "0.2.0-alpha.3"
|
version = "0.2.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Identity service for actix web framework."
|
description = "Identity service for actix web framework."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -17,7 +17,7 @@ name = "actix_identity"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "2.0.0-alpha.3", default-features = false, features = ["secure-cookies"] }
|
actix-web = { version = "2.0.0-alpha.5", default-features = false, features = ["secure-cookies"] }
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
@@ -26,5 +26,5 @@ time = "0.1.42"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-http = "1.0.0-alpha.3"
|
actix-http = "1.0.0"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-multipart"
|
name = "actix-multipart"
|
||||||
version = "0.2.0-alpha.3"
|
version = "0.2.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Multipart support for actix web framework."
|
description = "Multipart support for actix web framework."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -9,8 +9,6 @@ homepage = "https://actix.rs"
|
|||||||
repository = "https://github.com/actix/actix-web.git"
|
repository = "https://github.com/actix/actix-web.git"
|
||||||
documentation = "https://docs.rs/actix-multipart/"
|
documentation = "https://docs.rs/actix-multipart/"
|
||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
|
||||||
workspace = ".."
|
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
@@ -18,10 +16,10 @@ name = "actix_multipart"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "2.0.0-alpha.3", default-features = false }
|
actix-web = { version = "2.0.0-alpha.5", default-features = false }
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
actix-utils = "1.0.0"
|
actix-utils = "1.0.3"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
httparse = "1.3"
|
httparse = "1.3"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
@@ -32,4 +30,4 @@ twoway = "0.2"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
actix-http = "1.0.0-alpha.3"
|
actix-http = "1.0.0"
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-session"
|
name = "actix-session"
|
||||||
version = "0.3.0-alpha.3"
|
version = "0.3.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Session for actix web framework."
|
description = "Session for actix web framework."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -9,8 +9,6 @@ homepage = "https://actix.rs"
|
|||||||
repository = "https://github.com/actix/actix-web.git"
|
repository = "https://github.com/actix/actix-web.git"
|
||||||
documentation = "https://docs.rs/actix-session/"
|
documentation = "https://docs.rs/actix-session/"
|
||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
|
||||||
workspace = ".."
|
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
@@ -24,9 +22,9 @@ default = ["cookie-session"]
|
|||||||
cookie-session = ["actix-web/secure-cookies"]
|
cookie-session = ["actix-web/secure-cookies"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = "2.0.0-alpha.3"
|
actix-web = "2.0.0-alpha.5"
|
||||||
actix-service = "1.0.0"
|
actix-service = "1.0.0"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
serde = "1.0"
|
serde = "1.0"
|
||||||
|
@@ -1,5 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [2.0.0-alpha.1] - 2019-12-15
|
||||||
|
|
||||||
|
* Migrate to actix-web 2.0.0
|
||||||
|
|
||||||
## [1.0.4] - 2019-12-07
|
## [1.0.4] - 2019-12-07
|
||||||
|
|
||||||
* Allow comma-separated websocket subprotocols without spaces (#1172)
|
* Allow comma-separated websocket subprotocols without spaces (#1172)
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web-actors"
|
name = "actix-web-actors"
|
||||||
version = "1.0.4"
|
version = "2.0.0-alpha.1"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix actors support for actix web framework."
|
description = "Actix actors support for actix web framework."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -9,8 +9,6 @@ homepage = "https://actix.rs"
|
|||||||
repository = "https://github.com/actix/actix-web.git"
|
repository = "https://github.com/actix/actix-web.git"
|
||||||
documentation = "https://docs.rs/actix-web-actors/"
|
documentation = "https://docs.rs/actix-web-actors/"
|
||||||
license = "MIT/Apache-2.0"
|
license = "MIT/Apache-2.0"
|
||||||
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
|
|
||||||
workspace = ".."
|
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
@@ -18,13 +16,14 @@ name = "actix_web_actors"
|
|||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix = "0.8.3"
|
actix = "0.9.0-alpha.1"
|
||||||
actix-web = "1.0.9"
|
actix-web = "2.0.0-alpha.5"
|
||||||
actix-http = "0.2.11"
|
actix-http = "1.0.0"
|
||||||
actix-codec = "0.1.2"
|
actix-codec = "0.2.0"
|
||||||
bytes = "0.4"
|
bytes = "0.5.2"
|
||||||
futures = "0.1.25"
|
futures = "0.3.1"
|
||||||
|
pin-project = "0.4.6"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
actix-rt = "1.0.0"
|
||||||
env_logger = "0.6"
|
env_logger = "0.6"
|
||||||
actix-http-test = { version = "0.2.4", features=["ssl"] }
|
|
||||||
|
@@ -1,4 +1,6 @@
|
|||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix::dev::{
|
use actix::dev::{
|
||||||
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope,
|
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, ToEnvelope,
|
||||||
@@ -7,10 +9,10 @@ use actix::fut::ActorFuture;
|
|||||||
use actix::{
|
use actix::{
|
||||||
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
|
Actor, ActorContext, ActorState, Addr, AsyncContext, Handler, Message, SpawnHandle,
|
||||||
};
|
};
|
||||||
use actix_web::error::{Error, ErrorInternalServerError};
|
use actix_web::error::Error;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::sync::oneshot::Sender;
|
use futures::channel::oneshot::Sender;
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Future, Stream};
|
||||||
|
|
||||||
/// Execution context for http actors
|
/// Execution context for http actors
|
||||||
pub struct HttpContext<A>
|
pub struct HttpContext<A>
|
||||||
@@ -43,7 +45,7 @@ where
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
|
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
|
||||||
where
|
where
|
||||||
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
|
F: ActorFuture<Output = (), Actor = A> + 'static,
|
||||||
{
|
{
|
||||||
self.inner.spawn(fut)
|
self.inner.spawn(fut)
|
||||||
}
|
}
|
||||||
@@ -51,7 +53,7 @@ where
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn wait<F>(&mut self, fut: F)
|
fn wait<F>(&mut self, fut: F)
|
||||||
where
|
where
|
||||||
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
|
F: ActorFuture<Output = (), Actor = A> + 'static,
|
||||||
{
|
{
|
||||||
self.inner.wait(fut)
|
self.inner.wait(fut)
|
||||||
}
|
}
|
||||||
@@ -81,7 +83,7 @@ where
|
|||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Create a new HTTP Context from a request and an actor
|
/// Create a new HTTP Context from a request and an actor
|
||||||
pub fn create(actor: A) -> impl Stream<Item = Bytes, Error = Error> {
|
pub fn create(actor: A) -> impl Stream<Item = Result<Bytes, Error>> {
|
||||||
let mb = Mailbox::default();
|
let mb = Mailbox::default();
|
||||||
let ctx = HttpContext {
|
let ctx = HttpContext {
|
||||||
inner: ContextParts::new(mb.sender_producer()),
|
inner: ContextParts::new(mb.sender_producer()),
|
||||||
@@ -91,7 +93,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new HTTP Context
|
/// Create a new HTTP Context
|
||||||
pub fn with_factory<F>(f: F) -> impl Stream<Item = Bytes, Error = Error>
|
pub fn with_factory<F>(f: F) -> impl Stream<Item = Result<Bytes, Error>>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut Self) -> A + 'static,
|
F: FnOnce(&mut Self) -> A + 'static,
|
||||||
{
|
{
|
||||||
@@ -160,24 +162,23 @@ impl<A> Stream for HttpContextFut<A>
|
|||||||
where
|
where
|
||||||
A: Actor<Context = HttpContext<A>>,
|
A: Actor<Context = HttpContext<A>>,
|
||||||
{
|
{
|
||||||
type Item = Bytes;
|
type Item = Result<Bytes, Error>;
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
|
fn poll_next(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
if self.fut.alive() {
|
if self.fut.alive() {
|
||||||
match self.fut.poll() {
|
let _ = Pin::new(&mut self.fut).poll(cx);
|
||||||
Ok(Async::NotReady) | Ok(Async::Ready(())) => (),
|
|
||||||
Err(_) => return Err(ErrorInternalServerError("error")),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// frames
|
// frames
|
||||||
if let Some(data) = self.fut.ctx().stream.pop_front() {
|
if let Some(data) = self.fut.ctx().stream.pop_front() {
|
||||||
Ok(Async::Ready(data))
|
Poll::Ready(data.map(|b| Ok(b)))
|
||||||
} else if self.fut.alive() {
|
} else if self.fut.alive() {
|
||||||
Ok(Async::NotReady)
|
Poll::Pending
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::Ready(None))
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -199,9 +200,9 @@ mod tests {
|
|||||||
|
|
||||||
use actix::Actor;
|
use actix::Actor;
|
||||||
use actix_web::http::StatusCode;
|
use actix_web::http::StatusCode;
|
||||||
use actix_web::test::{block_on, call_service, init_service, TestRequest};
|
use actix_web::test::{call_service, init_service, read_body, TestRequest};
|
||||||
use actix_web::{web, App, HttpResponse};
|
use actix_web::{web, App, HttpResponse};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::Bytes;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
@@ -223,31 +224,25 @@ mod tests {
|
|||||||
if self.count > 3 {
|
if self.count > 3 {
|
||||||
ctx.write_eof()
|
ctx.write_eof()
|
||||||
} else {
|
} else {
|
||||||
ctx.write(Bytes::from(format!("LINE-{}", self.count).as_bytes()));
|
ctx.write(Bytes::from(format!("LINE-{}", self.count)));
|
||||||
ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx));
|
ctx.run_later(Duration::from_millis(100), |slf, ctx| slf.write(ctx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[actix_rt::test]
|
||||||
fn test_default_resource() {
|
async fn test_default_resource() {
|
||||||
let mut srv =
|
let mut srv =
|
||||||
init_service(App::new().service(web::resource("/test").to(|| {
|
init_service(App::new().service(web::resource("/test").to(|| {
|
||||||
HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
|
HttpResponse::Ok().streaming(HttpContext::create(MyActor { count: 0 }))
|
||||||
})));
|
})))
|
||||||
|
.await;
|
||||||
|
|
||||||
let req = TestRequest::with_uri("/test").to_request();
|
let req = TestRequest::with_uri("/test").to_request();
|
||||||
let mut resp = call_service(&mut srv, req);
|
let resp = call_service(&mut srv, req).await;
|
||||||
assert_eq!(resp.status(), StatusCode::OK);
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
|
||||||
let body = block_on(resp.take_body().fold(
|
let body = read_body(resp).await;
|
||||||
BytesMut::new(),
|
assert_eq!(body, Bytes::from_static(b"LINE-1LINE-2LINE-3"));
|
||||||
move |mut body, chunk| {
|
|
||||||
body.extend_from_slice(&chunk);
|
|
||||||
Ok::<_, Error>(body)
|
|
||||||
},
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(body.freeze(), Bytes::from_static(b"LINE-1LINE-2LINE-3"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,6 +1,8 @@
|
|||||||
//! Websocket integration
|
//! Websocket integration
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix::dev::{
|
use actix::dev::{
|
||||||
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler,
|
AsyncContextParts, ContextFut, ContextParts, Envelope, Mailbox, StreamHandler,
|
||||||
@@ -16,20 +18,20 @@ use actix_http::ws::{hash_key, Codec};
|
|||||||
pub use actix_http::ws::{
|
pub use actix_http::ws::{
|
||||||
CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError,
|
CloseCode, CloseReason, Frame, HandshakeError, Message, ProtocolError,
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_web::dev::HttpResponseBuilder;
|
use actix_web::dev::HttpResponseBuilder;
|
||||||
use actix_web::error::{Error, ErrorInternalServerError, PayloadError};
|
use actix_web::error::{Error, PayloadError};
|
||||||
use actix_web::http::{header, Method, StatusCode};
|
use actix_web::http::{header, Method, StatusCode};
|
||||||
use actix_web::{HttpRequest, HttpResponse};
|
use actix_web::{HttpRequest, HttpResponse};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures::sync::oneshot::Sender;
|
use futures::channel::oneshot::Sender;
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Future, Stream};
|
||||||
|
|
||||||
/// Do websocket handshake and start ws actor.
|
/// Do websocket handshake and start ws actor.
|
||||||
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
|
pub fn start<A, T>(actor: A, req: &HttpRequest, stream: T) -> Result<HttpResponse, Error>
|
||||||
where
|
where
|
||||||
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
|
A: Actor<Context = WebsocketContext<A>>
|
||||||
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
+ StreamHandler<Result<Message, ProtocolError>>,
|
||||||
|
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mut res = handshake(req)?;
|
let mut res = handshake(req)?;
|
||||||
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
||||||
@@ -52,8 +54,9 @@ pub fn start_with_addr<A, T>(
|
|||||||
stream: T,
|
stream: T,
|
||||||
) -> Result<(Addr<A>, HttpResponse), Error>
|
) -> Result<(Addr<A>, HttpResponse), Error>
|
||||||
where
|
where
|
||||||
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
|
A: Actor<Context = WebsocketContext<A>>
|
||||||
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
+ StreamHandler<Result<Message, ProtocolError>>,
|
||||||
|
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mut res = handshake(req)?;
|
let mut res = handshake(req)?;
|
||||||
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream);
|
let (addr, out_stream) = WebsocketContext::create_with_addr(actor, stream);
|
||||||
@@ -70,8 +73,9 @@ pub fn start_with_protocols<A, T>(
|
|||||||
stream: T,
|
stream: T,
|
||||||
) -> Result<HttpResponse, Error>
|
) -> Result<HttpResponse, Error>
|
||||||
where
|
where
|
||||||
A: Actor<Context = WebsocketContext<A>> + StreamHandler<Message, ProtocolError>,
|
A: Actor<Context = WebsocketContext<A>>
|
||||||
T: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
+ StreamHandler<Result<Message, ProtocolError>>,
|
||||||
|
T: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mut res = handshake_with_protocols(req, protocols)?;
|
let mut res = handshake_with_protocols(req, protocols)?;
|
||||||
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
Ok(res.streaming(WebsocketContext::create(actor, stream)))
|
||||||
@@ -202,14 +206,14 @@ where
|
|||||||
{
|
{
|
||||||
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
|
fn spawn<F>(&mut self, fut: F) -> SpawnHandle
|
||||||
where
|
where
|
||||||
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
|
F: ActorFuture<Output = (), Actor = A> + 'static,
|
||||||
{
|
{
|
||||||
self.inner.spawn(fut)
|
self.inner.spawn(fut)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wait<F>(&mut self, fut: F)
|
fn wait<F>(&mut self, fut: F)
|
||||||
where
|
where
|
||||||
F: ActorFuture<Item = (), Error = (), Actor = A> + 'static,
|
F: ActorFuture<Output = (), Actor = A> + 'static,
|
||||||
{
|
{
|
||||||
self.inner.wait(fut)
|
self.inner.wait(fut)
|
||||||
}
|
}
|
||||||
@@ -238,10 +242,10 @@ where
|
|||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
/// Create a new Websocket context from a request and an actor
|
/// Create a new Websocket context from a request and an actor
|
||||||
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Bytes, Error = Error>
|
pub fn create<S>(actor: A, stream: S) -> impl Stream<Item = Result<Bytes, Error>>
|
||||||
where
|
where
|
||||||
A: StreamHandler<Message, ProtocolError>,
|
A: StreamHandler<Result<Message, ProtocolError>>,
|
||||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let (_, stream) = WebsocketContext::create_with_addr(actor, stream);
|
let (_, stream) = WebsocketContext::create_with_addr(actor, stream);
|
||||||
stream
|
stream
|
||||||
@@ -256,10 +260,10 @@ where
|
|||||||
pub fn create_with_addr<S>(
|
pub fn create_with_addr<S>(
|
||||||
actor: A,
|
actor: A,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> (Addr<A>, impl Stream<Item = Bytes, Error = Error>)
|
) -> (Addr<A>, impl Stream<Item = Result<Bytes, Error>>)
|
||||||
where
|
where
|
||||||
A: StreamHandler<Message, ProtocolError>,
|
A: StreamHandler<Result<Message, ProtocolError>>,
|
||||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mb = Mailbox::default();
|
let mb = Mailbox::default();
|
||||||
let mut ctx = WebsocketContext {
|
let mut ctx = WebsocketContext {
|
||||||
@@ -279,10 +283,10 @@ where
|
|||||||
actor: A,
|
actor: A,
|
||||||
stream: S,
|
stream: S,
|
||||||
codec: Codec,
|
codec: Codec,
|
||||||
) -> impl Stream<Item = Bytes, Error = Error>
|
) -> impl Stream<Item = Result<Bytes, Error>>
|
||||||
where
|
where
|
||||||
A: StreamHandler<Message, ProtocolError>,
|
A: StreamHandler<Result<Message, ProtocolError>>,
|
||||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mb = Mailbox::default();
|
let mb = Mailbox::default();
|
||||||
let mut ctx = WebsocketContext {
|
let mut ctx = WebsocketContext {
|
||||||
@@ -298,11 +302,11 @@ where
|
|||||||
pub fn with_factory<S, F>(
|
pub fn with_factory<S, F>(
|
||||||
stream: S,
|
stream: S,
|
||||||
f: F,
|
f: F,
|
||||||
) -> impl Stream<Item = Bytes, Error = Error>
|
) -> impl Stream<Item = Result<Bytes, Error>>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut Self) -> A + 'static,
|
F: FnOnce(&mut Self) -> A + 'static,
|
||||||
A: StreamHandler<Message, ProtocolError>,
|
A: StreamHandler<Result<Message, ProtocolError>>,
|
||||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
let mb = Mailbox::default();
|
let mb = Mailbox::default();
|
||||||
let mut ctx = WebsocketContext {
|
let mut ctx = WebsocketContext {
|
||||||
@@ -346,14 +350,14 @@ where
|
|||||||
|
|
||||||
/// Send ping frame
|
/// Send ping frame
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn ping(&mut self, message: &str) {
|
pub fn ping(&mut self, message: &[u8]) {
|
||||||
self.write_raw(Message::Ping(message.to_string()));
|
self.write_raw(Message::Ping(Bytes::copy_from_slice(message)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send pong frame
|
/// Send pong frame
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn pong(&mut self, message: &str) {
|
pub fn pong(&mut self, message: &[u8]) {
|
||||||
self.write_raw(Message::Pong(message.to_string()));
|
self.write_raw(Message::Pong(Bytes::copy_from_slice(message)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send close frame
|
/// Send close frame
|
||||||
@@ -415,30 +419,34 @@ impl<A> Stream for WebsocketContextFut<A>
|
|||||||
where
|
where
|
||||||
A: Actor<Context = WebsocketContext<A>>,
|
A: Actor<Context = WebsocketContext<A>>,
|
||||||
{
|
{
|
||||||
type Item = Bytes;
|
type Item = Result<Bytes, Error>;
|
||||||
type Error = Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Bytes>, Error> {
|
fn poll_next(
|
||||||
if self.fut.alive() && self.fut.poll().is_err() {
|
self: Pin<&mut Self>,
|
||||||
return Err(ErrorInternalServerError("error"));
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
|
||||||
|
if this.fut.alive() {
|
||||||
|
let _ = Pin::new(&mut this.fut).poll(cx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// encode messages
|
// encode messages
|
||||||
while let Some(item) = self.fut.ctx().messages.pop_front() {
|
while let Some(item) = this.fut.ctx().messages.pop_front() {
|
||||||
if let Some(msg) = item {
|
if let Some(msg) = item {
|
||||||
self.encoder.encode(msg, &mut self.buf)?;
|
this.encoder.encode(msg, &mut this.buf)?;
|
||||||
} else {
|
} else {
|
||||||
self.closed = true;
|
this.closed = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.buf.is_empty() {
|
if !this.buf.is_empty() {
|
||||||
Ok(Async::Ready(Some(self.buf.take().freeze())))
|
Poll::Ready(Some(Ok(this.buf.split().freeze())))
|
||||||
} else if self.fut.alive() && !self.closed {
|
} else if this.fut.alive() && !this.closed {
|
||||||
Ok(Async::NotReady)
|
Poll::Pending
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::Ready(None))
|
Poll::Ready(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -454,7 +462,9 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
struct WsStream<S> {
|
struct WsStream<S> {
|
||||||
|
#[pin]
|
||||||
stream: S,
|
stream: S,
|
||||||
decoder: Codec,
|
decoder: Codec,
|
||||||
buf: BytesMut,
|
buf: BytesMut,
|
||||||
@@ -463,7 +473,7 @@ struct WsStream<S> {
|
|||||||
|
|
||||||
impl<S> WsStream<S>
|
impl<S> WsStream<S>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Bytes, Error = PayloadError>,
|
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||||
{
|
{
|
||||||
fn new(stream: S, codec: Codec) -> Self {
|
fn new(stream: S, codec: Codec) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -477,62 +487,64 @@ where
|
|||||||
|
|
||||||
impl<S> Stream for WsStream<S>
|
impl<S> Stream for WsStream<S>
|
||||||
where
|
where
|
||||||
S: Stream<Item = Bytes, Error = PayloadError>,
|
S: Stream<Item = Result<Bytes, PayloadError>>,
|
||||||
{
|
{
|
||||||
type Item = Message;
|
type Item = Result<Message, ProtocolError>;
|
||||||
type Error = ProtocolError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll_next(
|
||||||
if !self.closed {
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
let mut this = self.as_mut().project();
|
||||||
|
|
||||||
|
if !*this.closed {
|
||||||
loop {
|
loop {
|
||||||
match self.stream.poll() {
|
this = self.as_mut().project();
|
||||||
Ok(Async::Ready(Some(chunk))) => {
|
match Pin::new(&mut this.stream).poll_next(cx) {
|
||||||
self.buf.extend_from_slice(&chunk[..]);
|
Poll::Ready(Some(Ok(chunk))) => {
|
||||||
|
this.buf.extend_from_slice(&chunk[..]);
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(None)) => {
|
Poll::Ready(None) => {
|
||||||
self.closed = true;
|
*this.closed = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => break,
|
Poll::Pending => break,
|
||||||
Err(e) => {
|
Poll::Ready(Some(Err(e))) => {
|
||||||
return Err(ProtocolError::Io(io::Error::new(
|
return Poll::Ready(Some(Err(ProtocolError::Io(
|
||||||
io::ErrorKind::Other,
|
io::Error::new(io::ErrorKind::Other, format!("{}", e)),
|
||||||
format!("{}", e),
|
))));
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.decoder.decode(&mut self.buf)? {
|
match this.decoder.decode(this.buf)? {
|
||||||
None => {
|
None => {
|
||||||
if self.closed {
|
if *this.closed {
|
||||||
Ok(Async::Ready(None))
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::NotReady)
|
Poll::Pending
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(frm) => {
|
Some(frm) => {
|
||||||
let msg = match frm {
|
let msg = match frm {
|
||||||
Frame::Text(data) => {
|
Frame::Text(data) => Message::Text(
|
||||||
if let Some(data) = data {
|
std::str::from_utf8(&data)
|
||||||
Message::Text(
|
.map_err(|e| {
|
||||||
std::str::from_utf8(&data)
|
ProtocolError::Io(io::Error::new(
|
||||||
.map_err(|_| ProtocolError::BadEncoding)?
|
io::ErrorKind::Other,
|
||||||
.to_string(),
|
format!("{}", e),
|
||||||
)
|
))
|
||||||
} else {
|
})?
|
||||||
Message::Text(String::new())
|
.to_string(),
|
||||||
}
|
|
||||||
}
|
|
||||||
Frame::Binary(data) => Message::Binary(
|
|
||||||
data.map(|b| b.freeze()).unwrap_or_else(Bytes::new),
|
|
||||||
),
|
),
|
||||||
|
Frame::Binary(data) => Message::Binary(data),
|
||||||
Frame::Ping(s) => Message::Ping(s),
|
Frame::Ping(s) => Message::Ping(s),
|
||||||
Frame::Pong(s) => Message::Pong(s),
|
Frame::Pong(s) => Message::Pong(s),
|
||||||
Frame::Close(reason) => Message::Close(reason),
|
Frame::Close(reason) => Message::Close(reason),
|
||||||
|
Frame::Continuation(item) => Message::Continuation(item),
|
||||||
};
|
};
|
||||||
Ok(Async::Ready(Some(msg)))
|
Poll::Ready(Some(Ok(msg)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,10 +1,8 @@
|
|||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use actix_http::HttpService;
|
use actix_web::{test, web, App, HttpRequest};
|
||||||
use actix_http_test::TestServer;
|
|
||||||
use actix_web::{web, App, HttpRequest};
|
|
||||||
use actix_web_actors::*;
|
use actix_web_actors::*;
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::Bytes;
|
||||||
use futures::{Sink, Stream};
|
use futures::{SinkExt, StreamExt};
|
||||||
|
|
||||||
struct Ws;
|
struct Ws;
|
||||||
|
|
||||||
@@ -12,9 +10,13 @@ impl Actor for Ws {
|
|||||||
type Context = ws::WebsocketContext<Self>;
|
type Context = ws::WebsocketContext<Self>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
|
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Ws {
|
||||||
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
|
fn handle(
|
||||||
match msg {
|
&mut self,
|
||||||
|
msg: Result<ws::Message, ws::ProtocolError>,
|
||||||
|
ctx: &mut Self::Context,
|
||||||
|
) {
|
||||||
|
match msg.unwrap() {
|
||||||
ws::Message::Ping(msg) => ctx.pong(&msg),
|
ws::Message::Ping(msg) => ctx.pong(&msg),
|
||||||
ws::Message::Text(text) => ctx.text(text),
|
ws::Message::Text(text) => ctx.text(text),
|
||||||
ws::Message::Binary(bin) => ctx.binary(bin),
|
ws::Message::Binary(bin) => ctx.binary(bin),
|
||||||
@@ -24,45 +26,42 @@ impl StreamHandler<ws::Message, ws::ProtocolError> for Ws {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[actix_rt::test]
|
||||||
fn test_simple() {
|
async fn test_simple() {
|
||||||
let mut srv =
|
let mut srv = test::start(|| {
|
||||||
TestServer::new(|| {
|
App::new().service(web::resource("/").to(
|
||||||
HttpService::new(App::new().service(web::resource("/").to(
|
|req: HttpRequest, stream: web::Payload| {
|
||||||
|req: HttpRequest, stream: web::Payload| ws::start(Ws, &req, stream),
|
async move { ws::start(Ws, &req, stream) }
|
||||||
)))
|
},
|
||||||
});
|
))
|
||||||
|
});
|
||||||
|
|
||||||
// client service
|
// client service
|
||||||
let framed = srv.ws().unwrap();
|
let mut framed = srv.ws().await.unwrap();
|
||||||
let framed = srv
|
framed
|
||||||
.block_on(framed.send(ws::Message::Text("text".to_string())))
|
.send(ws::Message::Text("text".to_string()))
|
||||||
.unwrap();
|
.await
|
||||||
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
|
|
||||||
assert_eq!(item, Some(ws::Frame::Text(Some(BytesMut::from("text")))));
|
|
||||||
|
|
||||||
let framed = srv
|
|
||||||
.block_on(framed.send(ws::Message::Binary("text".into())))
|
|
||||||
.unwrap();
|
|
||||||
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
|
|
||||||
assert_eq!(
|
|
||||||
item,
|
|
||||||
Some(ws::Frame::Binary(Some(Bytes::from_static(b"text").into())))
|
|
||||||
);
|
|
||||||
|
|
||||||
let framed = srv
|
|
||||||
.block_on(framed.send(ws::Message::Ping("text".into())))
|
|
||||||
.unwrap();
|
|
||||||
let (item, framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
|
|
||||||
assert_eq!(item, Some(ws::Frame::Pong("text".to_string().into())));
|
|
||||||
|
|
||||||
let framed = srv
|
|
||||||
.block_on(framed.send(ws::Message::Close(Some(ws::CloseCode::Normal.into()))))
|
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let (item, _framed) = srv.block_on(framed.into_future()).map_err(|_| ()).unwrap();
|
let item = framed.next().await.unwrap().unwrap();
|
||||||
assert_eq!(
|
assert_eq!(item, ws::Frame::Text(Bytes::from_static(b"text")));
|
||||||
item,
|
|
||||||
Some(ws::Frame::Close(Some(ws::CloseCode::Normal.into())))
|
framed
|
||||||
);
|
.send(ws::Message::Binary("text".into()))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let item = framed.next().await.unwrap().unwrap();
|
||||||
|
assert_eq!(item, ws::Frame::Binary(Bytes::from_static(b"text").into()));
|
||||||
|
|
||||||
|
framed.send(ws::Message::Ping("text".into())).await.unwrap();
|
||||||
|
let item = framed.next().await.unwrap().unwrap();
|
||||||
|
assert_eq!(item, ws::Frame::Pong(Bytes::copy_from_slice(b"text")));
|
||||||
|
|
||||||
|
framed
|
||||||
|
.send(ws::Message::Close(Some(ws::CloseCode::Normal.into())))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let item = framed.next().await.unwrap().unwrap();
|
||||||
|
assert_eq!(item, ws::Frame::Close(Some(ws::CloseCode::Normal.into())));
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [0.2.0] - 2019-12-13
|
||||||
|
|
||||||
|
* Generate code for actix-web 2.0
|
||||||
|
|
||||||
## [0.1.3] - 2019-10-14
|
## [0.1.3] - 2019-10-14
|
||||||
|
|
||||||
* Bump up `syn` & `quote` to 1.0
|
* Bump up `syn` & `quote` to 1.0
|
||||||
|
@@ -1,5 +1,10 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## [1.0.1] - 2019-12-15
|
||||||
|
|
||||||
|
* Fix compilation with default features off
|
||||||
|
|
||||||
|
|
||||||
## [1.0.0] - 2019-12-13
|
## [1.0.0] - 2019-12-13
|
||||||
|
|
||||||
* Release
|
* Release
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "awc"
|
name = "awc"
|
||||||
version = "1.0.0"
|
version = "1.0.1"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix http client."
|
description = "Actix http client."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
@@ -40,7 +40,7 @@ actix-http = "1.0.0"
|
|||||||
actix-rt = "1.0.0"
|
actix-rt = "1.0.0"
|
||||||
|
|
||||||
base64 = "0.11"
|
base64 = "0.11"
|
||||||
bytes = "0.5.2"
|
bytes = "0.5.3"
|
||||||
derive_more = "0.99.2"
|
derive_more = "0.99.2"
|
||||||
futures-core = "0.3.1"
|
futures-core = "0.3.1"
|
||||||
log =" 0.4"
|
log =" 0.4"
|
||||||
@@ -54,14 +54,14 @@ open-ssl = { version="0.10", package="openssl", optional = true }
|
|||||||
rust-tls = { version = "0.16.0", package="rustls", optional = true, features = ["dangerous_configuration"] }
|
rust-tls = { version = "0.16.0", package="rustls", optional = true, features = ["dangerous_configuration"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-connect = { version = "1.0.0", features=["openssl"] }
|
actix-connect = { version = "1.0.1", features=["openssl"] }
|
||||||
actix-web = { version = "2.0.0-alpha.5", features=["openssl"] }
|
actix-web = { version = "2.0.0-alpha.5", features=["openssl"] }
|
||||||
actix-http = { version = "1.0.0", features=["openssl"] }
|
actix-http = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-http-test = { version = "1.0.0", features=["openssl"] }
|
actix-http-test = { version = "1.0.0", features=["openssl"] }
|
||||||
actix-utils = "1.0.3"
|
actix-utils = "1.0.3"
|
||||||
actix-server = "1.0.0"
|
actix-server = "1.0.0"
|
||||||
actix-tls = { version = "1.0.0", features=["openssl", "rustls"] }
|
actix-tls = { version = "1.0.0", features=["openssl", "rustls"] }
|
||||||
brotli = "3.3.0"
|
brotli2 = "0.3.2"
|
||||||
flate2 = "1.0.13"
|
flate2 = "1.0.13"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
env_logger = "0.6"
|
env_logger = "0.6"
|
||||||
|
@@ -7,15 +7,21 @@ use std::time::Duration;
|
|||||||
use actix_rt::time::{delay_for, Delay};
|
use actix_rt::time::{delay_for, Delay};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use derive_more::From;
|
use derive_more::From;
|
||||||
use futures_core::{ready, Future, Stream};
|
use futures_core::{Future, Stream};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
|
|
||||||
use actix_http::body::{Body, BodyStream};
|
use actix_http::body::{Body, BodyStream};
|
||||||
use actix_http::encoding::Decoder;
|
use actix_http::http::header::{self, IntoHeaderValue};
|
||||||
use actix_http::http::header::{self, ContentEncoding, IntoHeaderValue};
|
|
||||||
use actix_http::http::{Error as HttpError, HeaderMap, HeaderName};
|
use actix_http::http::{Error as HttpError, HeaderMap, HeaderName};
|
||||||
use actix_http::{Error, Payload, PayloadStream, RequestHead};
|
use actix_http::{Error, RequestHead};
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
|
use actix_http::encoding::Decoder;
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
|
use actix_http::http::header::ContentEncoding;
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
|
use actix_http::{Payload, PayloadStream};
|
||||||
|
|
||||||
use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError};
|
use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError};
|
||||||
use crate::response::ClientResponse;
|
use crate::response::ClientResponse;
|
||||||
@@ -67,6 +73,7 @@ impl SendClientRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
impl Future for SendClientRequest {
|
impl Future for SendClientRequest {
|
||||||
type Output =
|
type Output =
|
||||||
Result<ClientResponse<Decoder<Payload<PayloadStream>>>, SendRequestError>;
|
Result<ClientResponse<Decoder<Payload<PayloadStream>>>, SendRequestError>;
|
||||||
@@ -83,7 +90,7 @@ impl Future for SendClientRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let res = ready!(Pin::new(send).poll(cx)).map(|res| {
|
let res = futures_core::ready!(Pin::new(send).poll(cx)).map(|res| {
|
||||||
res.map_body(|head, payload| {
|
res.map_body(|head, payload| {
|
||||||
if *response_decompress {
|
if *response_decompress {
|
||||||
Payload::Stream(Decoder::from_headers(
|
Payload::Stream(Decoder::from_headers(
|
||||||
@@ -109,6 +116,30 @@ impl Future for SendClientRequest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
impl Future for SendClientRequest {
|
||||||
|
type Output = Result<ClientResponse, SendRequestError>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
match this {
|
||||||
|
SendClientRequest::Fut(send, delay, _) => {
|
||||||
|
if delay.is_some() {
|
||||||
|
match Pin::new(delay.as_mut().unwrap()).poll(cx) {
|
||||||
|
Poll::Pending => (),
|
||||||
|
_ => return Poll::Ready(Err(SendRequestError::Timeout)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Pin::new(send).poll(cx)
|
||||||
|
}
|
||||||
|
SendClientRequest::Err(ref mut e) => match e.take() {
|
||||||
|
Some(e) => Poll::Ready(Err(e)),
|
||||||
|
None => panic!("Attempting to call completed future"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<SendRequestError> for SendClientRequest {
|
impl From<SendRequestError> for SendClientRequest {
|
||||||
fn from(e: SendRequestError) -> Self {
|
fn from(e: SendRequestError) -> Self {
|
||||||
SendClientRequest::Err(Some(e))
|
SendClientRequest::Err(Some(e))
|
||||||
|
@@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use brotli::CompressorWriter;
|
use brotli2::write::BrotliEncoder;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::GzEncoder;
|
use flate2::write::GzEncoder;
|
||||||
@@ -15,8 +15,9 @@ use rand::Rng;
|
|||||||
use actix_http::HttpService;
|
use actix_http::HttpService;
|
||||||
use actix_http_test::test_server;
|
use actix_http_test::test_server;
|
||||||
use actix_service::pipeline_factory;
|
use actix_service::pipeline_factory;
|
||||||
|
use actix_web::dev::BodyEncoding;
|
||||||
use actix_web::http::Cookie;
|
use actix_web::http::Cookie;
|
||||||
use actix_web::middleware::{BodyEncoding, Compress};
|
use actix_web::middleware::Compress;
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
http::header, test, web, App, Error, HttpMessage, HttpRequest, HttpResponse,
|
http::header, test, web, App, Error, HttpMessage, HttpRequest, HttpResponse,
|
||||||
};
|
};
|
||||||
@@ -499,9 +500,9 @@ async fn test_client_gzip_encoding_large_random() {
|
|||||||
async fn test_client_brotli_encoding() {
|
async fn test_client_brotli_encoding() {
|
||||||
let srv = test::start(|| {
|
let srv = test::start(|| {
|
||||||
App::new().service(web::resource("/").route(web::to(|data: Bytes| {
|
App::new().service(web::resource("/").route(web::to(|data: Bytes| {
|
||||||
let mut e = CompressorWriter::new(Vec::new(), 0, 5, 0);
|
let mut e = BrotliEncoder::new(Vec::new(), 5);
|
||||||
e.write_all(&data).unwrap();
|
e.write_all(&data).unwrap();
|
||||||
let data = e.into_inner();
|
let data = e.finish().unwrap();
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.header("content-encoding", "br")
|
.header("content-encoding", "br")
|
||||||
.body(data)
|
.body(data)
|
||||||
@@ -526,9 +527,9 @@ async fn test_client_brotli_encoding_large_random() {
|
|||||||
|
|
||||||
let srv = test::start(|| {
|
let srv = test::start(|| {
|
||||||
App::new().service(web::resource("/").route(web::to(|data: Bytes| {
|
App::new().service(web::resource("/").route(web::to(|data: Bytes| {
|
||||||
let mut e = CompressorWriter::new(Vec::new(), 0, 5, 0);
|
let mut e = BrotliEncoder::new(Vec::new(), 5);
|
||||||
e.write_all(&data).unwrap();
|
e.write_all(&data).unwrap();
|
||||||
let data = e.into_inner();
|
let data = e.finish().unwrap();
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.header("content-encoding", "br")
|
.header("content-encoding", "br")
|
||||||
.body(data)
|
.body(data)
|
||||||
|
@@ -6,7 +6,6 @@ use url::ParseError as UrlParseError;
|
|||||||
|
|
||||||
use crate::http::StatusCode;
|
use crate::http::StatusCode;
|
||||||
use crate::HttpResponse;
|
use crate::HttpResponse;
|
||||||
use serde_urlencoded::de;
|
|
||||||
|
|
||||||
/// Errors which can occur when attempting to generate resource uri.
|
/// Errors which can occur when attempting to generate resource uri.
|
||||||
#[derive(Debug, PartialEq, Display, From)]
|
#[derive(Debug, PartialEq, Display, From)]
|
||||||
@@ -97,7 +96,7 @@ impl ResponseError for JsonPayloadError {
|
|||||||
pub enum PathError {
|
pub enum PathError {
|
||||||
/// Deserialize error
|
/// Deserialize error
|
||||||
#[display(fmt = "Path deserialize error: {}", _0)]
|
#[display(fmt = "Path deserialize error: {}", _0)]
|
||||||
Deserialize(de::Error),
|
Deserialize(serde::de::value::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return `BadRequest` for `PathError`
|
/// Return `BadRequest` for `PathError`
|
||||||
@@ -112,7 +111,7 @@ impl ResponseError for PathError {
|
|||||||
pub enum QueryPayloadError {
|
pub enum QueryPayloadError {
|
||||||
/// Deserialize error
|
/// Deserialize error
|
||||||
#[display(fmt = "Query deserialize error: {}", _0)]
|
#[display(fmt = "Query deserialize error: {}", _0)]
|
||||||
Deserialize(de::Error),
|
Deserialize(serde::de::value::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return `BadRequest` for `QueryPayloadError`
|
/// Return `BadRequest` for `QueryPayloadError`
|
||||||
|
45
src/lib.rs
45
src/lib.rs
@@ -141,6 +141,7 @@ pub mod dev {
|
|||||||
pub use crate::types::readlines::Readlines;
|
pub use crate::types::readlines::Readlines;
|
||||||
|
|
||||||
pub use actix_http::body::{Body, BodySize, MessageBody, ResponseBody, SizedStream};
|
pub use actix_http::body::{Body, BodySize, MessageBody, ResponseBody, SizedStream};
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
pub use actix_http::encoding::Decoder as Decompress;
|
pub use actix_http::encoding::Decoder as Decompress;
|
||||||
pub use actix_http::ResponseBuilder as HttpResponseBuilder;
|
pub use actix_http::ResponseBuilder as HttpResponseBuilder;
|
||||||
pub use actix_http::{
|
pub use actix_http::{
|
||||||
@@ -157,6 +158,50 @@ pub mod dev {
|
|||||||
};
|
};
|
||||||
path
|
path
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use crate::http::header::ContentEncoding;
|
||||||
|
use actix_http::{Response, ResponseBuilder};
|
||||||
|
|
||||||
|
struct Enc(ContentEncoding);
|
||||||
|
|
||||||
|
/// Helper trait that allows to set specific encoding for response.
|
||||||
|
pub trait BodyEncoding {
|
||||||
|
/// Get content encoding
|
||||||
|
fn get_encoding(&self) -> Option<ContentEncoding>;
|
||||||
|
|
||||||
|
/// Set content encoding
|
||||||
|
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BodyEncoding for ResponseBuilder {
|
||||||
|
fn get_encoding(&self) -> Option<ContentEncoding> {
|
||||||
|
if let Some(ref enc) = self.extensions().get::<Enc>() {
|
||||||
|
Some(enc.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self {
|
||||||
|
self.extensions_mut().insert(Enc(encoding));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> BodyEncoding for Response<B> {
|
||||||
|
fn get_encoding(&self) -> Option<ContentEncoding> {
|
||||||
|
if let Some(ref enc) = self.extensions().get::<Enc>() {
|
||||||
|
Some(enc.0)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self {
|
||||||
|
self.extensions_mut().insert(Enc(encoding));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod client {
|
pub mod client {
|
||||||
|
@@ -9,34 +9,14 @@ use std::task::{Context, Poll};
|
|||||||
use actix_http::body::MessageBody;
|
use actix_http::body::MessageBody;
|
||||||
use actix_http::encoding::Encoder;
|
use actix_http::encoding::Encoder;
|
||||||
use actix_http::http::header::{ContentEncoding, ACCEPT_ENCODING};
|
use actix_http::http::header::{ContentEncoding, ACCEPT_ENCODING};
|
||||||
use actix_http::{Error, Response, ResponseBuilder};
|
use actix_http::Error;
|
||||||
use actix_service::{Service, Transform};
|
use actix_service::{Service, Transform};
|
||||||
use futures::future::{ok, Ready};
|
use futures::future::{ok, Ready};
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
|
||||||
|
use crate::dev::BodyEncoding;
|
||||||
use crate::service::{ServiceRequest, ServiceResponse};
|
use crate::service::{ServiceRequest, ServiceResponse};
|
||||||
|
|
||||||
struct Enc(ContentEncoding);
|
|
||||||
|
|
||||||
/// Helper trait that allows to set specific encoding for response.
|
|
||||||
pub trait BodyEncoding {
|
|
||||||
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BodyEncoding for ResponseBuilder {
|
|
||||||
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self {
|
|
||||||
self.extensions_mut().insert(Enc(encoding));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> BodyEncoding for Response<B> {
|
|
||||||
fn encoding(&mut self, encoding: ContentEncoding) -> &mut Self {
|
|
||||||
self.extensions_mut().insert(Enc(encoding));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
/// `Middleware` for compressing response body.
|
/// `Middleware` for compressing response body.
|
||||||
///
|
///
|
||||||
@@ -155,8 +135,8 @@ where
|
|||||||
|
|
||||||
match futures::ready!(this.fut.poll(cx)) {
|
match futures::ready!(this.fut.poll(cx)) {
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
let enc = if let Some(enc) = resp.response().extensions().get::<Enc>() {
|
let enc = if let Some(enc) = resp.response().get_encoding() {
|
||||||
enc.0
|
enc
|
||||||
} else {
|
} else {
|
||||||
*this.encoding
|
*this.encoding
|
||||||
};
|
};
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
//! Middlewares
|
//! Middlewares
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
mod compress;
|
mod compress;
|
||||||
pub use self::compress::{BodyEncoding, Compress};
|
#[cfg(feature = "compress")]
|
||||||
|
pub use self::compress::Compress;
|
||||||
|
|
||||||
mod condition;
|
mod condition;
|
||||||
mod defaultheaders;
|
mod defaultheaders;
|
||||||
|
72
src/test.rs
72
src/test.rs
@@ -910,6 +910,7 @@ impl Drop for TestServer {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use actix_http::httpmessage::HttpMessage;
|
use actix_http::httpmessage::HttpMessage;
|
||||||
|
use futures::FutureExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
|
|
||||||
@@ -1095,41 +1096,46 @@ mod tests {
|
|||||||
assert!(res.status().is_success());
|
assert!(res.status().is_success());
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[actix_rt::test]
|
#[actix_rt::test]
|
||||||
// fn test_actor() {
|
async fn test_actor() {
|
||||||
// use actix::Actor;
|
use actix::Actor;
|
||||||
|
|
||||||
// struct MyActor;
|
struct MyActor;
|
||||||
|
|
||||||
// struct Num(usize);
|
struct Num(usize);
|
||||||
// impl actix::Message for Num {
|
impl actix::Message for Num {
|
||||||
// type Result = usize;
|
type Result = usize;
|
||||||
// }
|
}
|
||||||
// impl actix::Actor for MyActor {
|
impl actix::Actor for MyActor {
|
||||||
// type Context = actix::Context<Self>;
|
type Context = actix::Context<Self>;
|
||||||
// }
|
}
|
||||||
// impl actix::Handler<Num> for MyActor {
|
impl actix::Handler<Num> for MyActor {
|
||||||
// type Result = usize;
|
type Result = usize;
|
||||||
// fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: Num, _: &mut Self::Context) -> Self::Result {
|
||||||
// msg.0
|
msg.0
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
|
|
||||||
// let addr = run_on(|| MyActor.start());
|
let addr = MyActor.start();
|
||||||
// let mut app = init_service(App::new().service(
|
|
||||||
// web::resource("/index.html").to(move || {
|
|
||||||
// addr.send(Num(1)).from_err().and_then(|res| {
|
|
||||||
// if res == 1 {
|
|
||||||
// HttpResponse::Ok()
|
|
||||||
// } else {
|
|
||||||
// HttpResponse::BadRequest()
|
|
||||||
// }
|
|
||||||
// })
|
|
||||||
// }),
|
|
||||||
// ));
|
|
||||||
|
|
||||||
// let req = TestRequest::post().uri("/index.html").to_request();
|
let mut app = init_service(App::new().service(web::resource("/index.html").to(
|
||||||
// let res = block_fn(|| app.call(req)).unwrap();
|
move || {
|
||||||
// assert!(res.status().is_success());
|
addr.send(Num(1)).map(|res| match res {
|
||||||
// }
|
Ok(res) => {
|
||||||
|
if res == 1 {
|
||||||
|
Ok(HttpResponse::Ok())
|
||||||
|
} else {
|
||||||
|
Ok(HttpResponse::BadRequest())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => Err(err),
|
||||||
|
})
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let req = TestRequest::post().uri("/index.html").to_request();
|
||||||
|
let res = app.call(req).await.unwrap();
|
||||||
|
assert!(res.status().is_success());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -14,6 +14,7 @@ use futures::StreamExt;
|
|||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
use crate::dev::Decompress;
|
use crate::dev::Decompress;
|
||||||
use crate::error::UrlencodedError;
|
use crate::error::UrlencodedError;
|
||||||
use crate::extract::FromRequest;
|
use crate::extract::FromRequest;
|
||||||
@@ -240,7 +241,10 @@ impl Default for FormConfig {
|
|||||||
/// * content-length is greater than 32k
|
/// * content-length is greater than 32k
|
||||||
///
|
///
|
||||||
pub struct UrlEncoded<U> {
|
pub struct UrlEncoded<U> {
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
stream: Option<Decompress<Payload>>,
|
stream: Option<Decompress<Payload>>,
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
stream: Option<Payload>,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
length: Option<usize>,
|
length: Option<usize>,
|
||||||
encoding: &'static Encoding,
|
encoding: &'static Encoding,
|
||||||
@@ -273,7 +277,11 @@ impl<U> UrlEncoded<U> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
let payload = Decompress::from_headers(payload.take(), req.headers());
|
let payload = Decompress::from_headers(payload.take(), req.headers());
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
let payload = payload.take();
|
||||||
|
|
||||||
UrlEncoded {
|
UrlEncoded {
|
||||||
encoding,
|
encoding,
|
||||||
stream: Some(payload),
|
stream: Some(payload),
|
||||||
|
@@ -16,6 +16,7 @@ use serde_json;
|
|||||||
use actix_http::http::{header::CONTENT_LENGTH, StatusCode};
|
use actix_http::http::{header::CONTENT_LENGTH, StatusCode};
|
||||||
use actix_http::{HttpMessage, Payload, Response};
|
use actix_http::{HttpMessage, Payload, Response};
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
use crate::dev::Decompress;
|
use crate::dev::Decompress;
|
||||||
use crate::error::{Error, JsonPayloadError};
|
use crate::error::{Error, JsonPayloadError};
|
||||||
use crate::extract::FromRequest;
|
use crate::extract::FromRequest;
|
||||||
@@ -293,7 +294,10 @@ impl Default for JsonConfig {
|
|||||||
pub struct JsonBody<U> {
|
pub struct JsonBody<U> {
|
||||||
limit: usize,
|
limit: usize,
|
||||||
length: Option<usize>,
|
length: Option<usize>,
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
stream: Option<Decompress<Payload>>,
|
stream: Option<Decompress<Payload>>,
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
stream: Option<Payload>,
|
||||||
err: Option<JsonPayloadError>,
|
err: Option<JsonPayloadError>,
|
||||||
fut: Option<LocalBoxFuture<'static, Result<U, JsonPayloadError>>>,
|
fut: Option<LocalBoxFuture<'static, Result<U, JsonPayloadError>>>,
|
||||||
}
|
}
|
||||||
@@ -332,7 +336,11 @@ where
|
|||||||
.get(&CONTENT_LENGTH)
|
.get(&CONTENT_LENGTH)
|
||||||
.and_then(|l| l.to_str().ok())
|
.and_then(|l| l.to_str().ok())
|
||||||
.and_then(|s| s.parse::<usize>().ok());
|
.and_then(|s| s.parse::<usize>().ok());
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
let payload = Decompress::from_headers(payload.take(), req.headers());
|
let payload = Decompress::from_headers(payload.take(), req.headers());
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
let payload = payload.take();
|
||||||
|
|
||||||
JsonBody {
|
JsonBody {
|
||||||
limit: 262_144,
|
limit: 262_144,
|
||||||
|
@@ -301,7 +301,10 @@ impl Default for PayloadConfig {
|
|||||||
pub struct HttpMessageBody {
|
pub struct HttpMessageBody {
|
||||||
limit: usize,
|
limit: usize,
|
||||||
length: Option<usize>,
|
length: Option<usize>,
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
stream: Option<dev::Decompress<dev::Payload>>,
|
stream: Option<dev::Decompress<dev::Payload>>,
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
stream: Option<dev::Payload>,
|
||||||
err: Option<PayloadError>,
|
err: Option<PayloadError>,
|
||||||
fut: Option<LocalBoxFuture<'static, Result<Bytes, PayloadError>>>,
|
fut: Option<LocalBoxFuture<'static, Result<Bytes, PayloadError>>>,
|
||||||
}
|
}
|
||||||
@@ -322,8 +325,13 @@ impl HttpMessageBody {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "compress")]
|
||||||
|
let stream = Some(dev::Decompress::from_headers(payload.take(), req.headers()));
|
||||||
|
#[cfg(not(feature = "compress"))]
|
||||||
|
let stream = Some(payload.take());
|
||||||
|
|
||||||
HttpMessageBody {
|
HttpMessageBody {
|
||||||
stream: Some(dev::Decompress::from_headers(payload.take(), req.headers())),
|
stream,
|
||||||
limit: 262_144,
|
limit: 262_144,
|
||||||
length: len,
|
length: len,
|
||||||
fut: None,
|
fut: None,
|
||||||
|
@@ -55,5 +55,5 @@ time = "0.1"
|
|||||||
open-ssl = { version="0.10", package="openssl", optional = true }
|
open-ssl = { version="0.10", package="openssl", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-web = "2.0.0-alpha.4"
|
actix-web = "2.0.0-alpha.5"
|
||||||
actix-http = "1.0.0"
|
actix-http = "1.0.0"
|
||||||
|
@@ -1,19 +1,22 @@
|
|||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_http::http::header::{
|
use actix_http::http::header::{
|
||||||
ContentEncoding, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH,
|
ContentEncoding, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH,
|
||||||
TRANSFER_ENCODING,
|
TRANSFER_ENCODING,
|
||||||
};
|
};
|
||||||
use brotli::{CompressorWriter, DecompressorWriter};
|
use brotli2::write::{BrotliDecoder, BrotliEncoder};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::{GzEncoder, ZlibDecoder, ZlibEncoder};
|
use flate2::write::{GzEncoder, ZlibDecoder, ZlibEncoder};
|
||||||
use flate2::Compression;
|
use flate2::Compression;
|
||||||
use futures::{future::ok, stream::once};
|
use futures::{ready, Future};
|
||||||
use rand::{distributions::Alphanumeric, Rng};
|
use rand::{distributions::Alphanumeric, Rng};
|
||||||
|
|
||||||
use actix_web::middleware::{BodyEncoding, Compress};
|
use actix_web::dev::BodyEncoding;
|
||||||
use actix_web::{dev, http, test, web, App, Error, HttpResponse};
|
use actix_web::middleware::Compress;
|
||||||
|
use actix_web::{dev, test, web, App, Error, HttpResponse};
|
||||||
|
|
||||||
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
||||||
Hello World Hello World Hello World Hello World Hello World \
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
@@ -37,6 +40,42 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
|
|||||||
Hello World Hello World Hello World Hello World Hello World \
|
Hello World Hello World Hello World Hello World Hello World \
|
||||||
Hello World Hello World Hello World Hello World Hello World";
|
Hello World Hello World Hello World Hello World Hello World";
|
||||||
|
|
||||||
|
struct TestBody {
|
||||||
|
data: Bytes,
|
||||||
|
chunk_size: usize,
|
||||||
|
delay: actix_rt::time::Delay,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestBody {
|
||||||
|
fn new(data: Bytes, chunk_size: usize) -> Self {
|
||||||
|
TestBody {
|
||||||
|
data,
|
||||||
|
chunk_size,
|
||||||
|
delay: actix_rt::time::delay_for(std::time::Duration::from_millis(10)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl futures::Stream for TestBody {
|
||||||
|
type Item = Result<Bytes, Error>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
ready!(Pin::new(&mut self.delay).poll(cx));
|
||||||
|
|
||||||
|
self.delay = actix_rt::time::delay_for(std::time::Duration::from_millis(10));
|
||||||
|
let chunk_size = std::cmp::min(self.chunk_size, self.data.len());
|
||||||
|
let chunk = self.data.split_to(chunk_size);
|
||||||
|
if chunk.is_empty() {
|
||||||
|
Poll::Ready(None)
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Some(Ok(chunk)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_body() {
|
async fn test_body() {
|
||||||
let srv = test::start(|| {
|
let srv = test::start(|| {
|
||||||
@@ -247,7 +286,7 @@ async fn test_body_chunked_implicit() {
|
|||||||
.wrap(Compress::new(ContentEncoding::Gzip))
|
.wrap(Compress::new(ContentEncoding::Gzip))
|
||||||
.service(web::resource("/").route(web::get().to(move || {
|
.service(web::resource("/").route(web::get().to(move || {
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.streaming(once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))))
|
.streaming(TestBody::new(Bytes::from_static(STR.as_ref()), 24))
|
||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -280,7 +319,7 @@ async fn test_body_br_streaming() {
|
|||||||
App::new().wrap(Compress::new(ContentEncoding::Br)).service(
|
App::new().wrap(Compress::new(ContentEncoding::Br)).service(
|
||||||
web::resource("/").route(web::to(move || {
|
web::resource("/").route(web::to(move || {
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.streaming(once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))))
|
.streaming(TestBody::new(Bytes::from_static(STR.as_ref()), 24))
|
||||||
})),
|
})),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
@@ -296,11 +335,13 @@ async fn test_body_br_streaming() {
|
|||||||
|
|
||||||
// read response
|
// read response
|
||||||
let bytes = response.body().await.unwrap();
|
let bytes = response.body().await.unwrap();
|
||||||
|
println!("TEST: {:?}", bytes.len());
|
||||||
|
|
||||||
// decode br
|
// decode br
|
||||||
let mut e = DecompressorWriter::new(Vec::with_capacity(2048), 0);
|
let mut e = BrotliDecoder::new(Vec::with_capacity(2048));
|
||||||
e.write_all(bytes.as_ref()).unwrap();
|
e.write_all(bytes.as_ref()).unwrap();
|
||||||
let dec = e.into_inner().unwrap();
|
let dec = e.finish().unwrap();
|
||||||
|
println!("T: {:?}", Bytes::copy_from_slice(&dec));
|
||||||
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -332,7 +373,7 @@ async fn test_no_chunking() {
|
|||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.no_chunking()
|
.no_chunking()
|
||||||
.content_length(STR.len() as u64)
|
.content_length(STR.len() as u64)
|
||||||
.streaming(once(ok::<_, Error>(Bytes::from_static(STR.as_ref()))))
|
.streaming(TestBody::new(Bytes::from_static(STR.as_ref()), 24))
|
||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -396,9 +437,9 @@ async fn test_body_brotli() {
|
|||||||
let bytes = response.body().await.unwrap();
|
let bytes = response.body().await.unwrap();
|
||||||
|
|
||||||
// decode brotli
|
// decode brotli
|
||||||
let mut e = DecompressorWriter::new(Vec::with_capacity(2048), 0);
|
let mut e = BrotliDecoder::new(Vec::with_capacity(2048));
|
||||||
e.write_all(bytes.as_ref()).unwrap();
|
e.write_all(bytes.as_ref()).unwrap();
|
||||||
let dec = e.into_inner().unwrap();
|
let dec = e.finish().unwrap();
|
||||||
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -607,9 +648,9 @@ async fn test_brotli_encoding() {
|
|||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut e = CompressorWriter::new(Vec::new(), 0, 3, 0);
|
let mut e = BrotliEncoder::new(Vec::new(), 5);
|
||||||
e.write_all(STR.as_ref()).unwrap();
|
e.write_all(STR.as_ref()).unwrap();
|
||||||
let enc = e.into_inner();
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
// client request
|
// client request
|
||||||
let request = srv
|
let request = srv
|
||||||
@@ -626,17 +667,24 @@ async fn test_brotli_encoding() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_brotli_encoding_large() {
|
async fn test_brotli_encoding_large() {
|
||||||
let data = STR.repeat(10);
|
let data = rand::thread_rng()
|
||||||
|
.sample_iter(&Alphanumeric)
|
||||||
|
.take(320_000)
|
||||||
|
.collect::<String>();
|
||||||
|
|
||||||
let srv = test::start_with(test::config().h1(), || {
|
let srv = test::start_with(test::config().h1(), || {
|
||||||
App::new().service(
|
App::new().service(
|
||||||
web::resource("/")
|
web::resource("/")
|
||||||
.route(web::to(move |body: Bytes| HttpResponse::Ok().body(body))),
|
.data(web::PayloadConfig::new(320_000))
|
||||||
|
.route(web::to(move |body: Bytes| {
|
||||||
|
HttpResponse::Ok().streaming(TestBody::new(body, 10240))
|
||||||
|
})),
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut e = CompressorWriter::new(Vec::new(), 0, 3, 0);
|
let mut e = BrotliEncoder::new(Vec::new(), 5);
|
||||||
e.write_all(data.as_ref()).unwrap();
|
e.write_all(data.as_ref()).unwrap();
|
||||||
let enc = e.into_inner();
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
// client request
|
// client request
|
||||||
let request = srv
|
let request = srv
|
||||||
@@ -647,7 +695,7 @@ async fn test_brotli_encoding_large() {
|
|||||||
assert!(response.status().is_success());
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
// read response
|
// read response
|
||||||
let bytes = response.body().await.unwrap();
|
let bytes = response.body().limit(320_000).await.unwrap();
|
||||||
assert_eq!(bytes, Bytes::from(data));
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -668,20 +716,20 @@ async fn test_brotli_encoding_large_openssl() {
|
|||||||
let srv = test::start_with(test::config().openssl(builder.build()), move || {
|
let srv = test::start_with(test::config().openssl(builder.build()), move || {
|
||||||
App::new().service(web::resource("/").route(web::to(|bytes: Bytes| {
|
App::new().service(web::resource("/").route(web::to(|bytes: Bytes| {
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.encoding(http::ContentEncoding::Identity)
|
.encoding(actix_web::http::ContentEncoding::Identity)
|
||||||
.body(bytes)
|
.body(bytes)
|
||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
|
|
||||||
// body
|
// body
|
||||||
let mut e = CompressorWriter::new(Vec::new(), 0, 3, 0);
|
let mut e = BrotliEncoder::new(Vec::new(), 3);
|
||||||
e.write_all(data.as_ref()).unwrap();
|
e.write_all(data.as_ref()).unwrap();
|
||||||
let enc = e.into_inner();
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
// client request
|
// client request
|
||||||
let mut response = srv
|
let mut response = srv
|
||||||
.post("/")
|
.post("/")
|
||||||
.header(http::header::CONTENT_ENCODING, "br")
|
.header(actix_web::http::header::CONTENT_ENCODING, "br")
|
||||||
.send_body(enc)
|
.send_body(enc)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
@@ -716,7 +764,7 @@ async fn test_reading_deflate_encoding_large_random_rustls() {
|
|||||||
let srv = test::start_with(test::config().rustls(config), || {
|
let srv = test::start_with(test::config().rustls(config), || {
|
||||||
App::new().service(web::resource("/").route(web::to(|bytes: Bytes| {
|
App::new().service(web::resource("/").route(web::to(|bytes: Bytes| {
|
||||||
HttpResponse::Ok()
|
HttpResponse::Ok()
|
||||||
.encoding(http::ContentEncoding::Identity)
|
.encoding(actix_web::http::ContentEncoding::Identity)
|
||||||
.body(bytes)
|
.body(bytes)
|
||||||
})))
|
})))
|
||||||
});
|
});
|
||||||
@@ -729,8 +777,8 @@ async fn test_reading_deflate_encoding_large_random_rustls() {
|
|||||||
// client request
|
// client request
|
||||||
let req = srv
|
let req = srv
|
||||||
.post("/")
|
.post("/")
|
||||||
.header(http::header::CONTENT_ENCODING, "deflate")
|
.header(actix_web::http::header::CONTENT_ENCODING, "deflate")
|
||||||
.send_body(enc);
|
.send_stream(TestBody::new(Bytes::from(enc), 1024));
|
||||||
|
|
||||||
let mut response = req.await.unwrap();
|
let mut response = req.await.unwrap();
|
||||||
assert!(response.status().is_success());
|
assert!(response.status().is_success());
|
||||||
@@ -741,93 +789,6 @@ async fn test_reading_deflate_encoding_large_random_rustls() {
|
|||||||
assert_eq!(bytes, Bytes::from(data));
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
// #[cfg(all(feature = "tls", feature = "ssl"))]
|
|
||||||
// #[test]
|
|
||||||
// fn test_reading_deflate_encoding_large_random_nativetls() {
|
|
||||||
// use native_tls::{Identity, TlsAcceptor};
|
|
||||||
// use openssl::ssl::{
|
|
||||||
// SslAcceptor, SslConnector, SslFiletype, SslMethod, SslVerifyMode,
|
|
||||||
// };
|
|
||||||
// use std::fs::File;
|
|
||||||
// use std::sync::mpsc;
|
|
||||||
|
|
||||||
// use actix::{Actor, System};
|
|
||||||
// let (tx, rx) = mpsc::channel();
|
|
||||||
|
|
||||||
// // load ssl keys
|
|
||||||
// let mut file = File::open("tests/identity.pfx").unwrap();
|
|
||||||
// let mut identity = vec![];
|
|
||||||
// file.read_to_end(&mut identity).unwrap();
|
|
||||||
// let identity = Identity::from_pkcs12(&identity, "1").unwrap();
|
|
||||||
// let acceptor = TlsAcceptor::new(identity).unwrap();
|
|
||||||
|
|
||||||
// // load ssl keys
|
|
||||||
// let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
|
||||||
// builder
|
|
||||||
// .set_private_key_file("tests/key.pem", SslFiletype::PEM)
|
|
||||||
// .unwrap();
|
|
||||||
// builder
|
|
||||||
// .set_certificate_chain_file("tests/cert.pem")
|
|
||||||
// .unwrap();
|
|
||||||
|
|
||||||
// let data = rand::thread_rng()
|
|
||||||
// .sample_iter(&Alphanumeric)
|
|
||||||
// .take(160_000)
|
|
||||||
// .collect::<String>();
|
|
||||||
|
|
||||||
// let addr = test::TestServer::unused_addr();
|
|
||||||
// thread::spawn(move || {
|
|
||||||
// System::run(move || {
|
|
||||||
// server::new(|| {
|
|
||||||
// App::new().handler("/", |req: &HttpRequest| {
|
|
||||||
// req.body()
|
|
||||||
// .and_then(|bytes: Bytes| {
|
|
||||||
// Ok(HttpResponse::Ok()
|
|
||||||
// .content_encoding(http::ContentEncoding::Identity)
|
|
||||||
// .body(bytes))
|
|
||||||
// })
|
|
||||||
// .responder()
|
|
||||||
// })
|
|
||||||
// })
|
|
||||||
// .bind_tls(addr, acceptor)
|
|
||||||
// .unwrap()
|
|
||||||
// .start();
|
|
||||||
// let _ = tx.send(System::current());
|
|
||||||
// });
|
|
||||||
// });
|
|
||||||
// let sys = rx.recv().unwrap();
|
|
||||||
|
|
||||||
// let mut rt = System::new("test");
|
|
||||||
|
|
||||||
// // client connector
|
|
||||||
// let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
|
|
||||||
// builder.set_verify(SslVerifyMode::NONE);
|
|
||||||
// let conn = client::ClientConnector::with_connector(builder.build()).start();
|
|
||||||
|
|
||||||
// // encode data
|
|
||||||
// let mut e = ZlibEncoder::new(Vec::new(), Compression::default());
|
|
||||||
// e.write_all(data.as_ref()).unwrap();
|
|
||||||
// let enc = e.finish().unwrap();
|
|
||||||
|
|
||||||
// // client request
|
|
||||||
// let request = client::ClientRequest::build()
|
|
||||||
// .uri(format!("https://{}/", addr))
|
|
||||||
// .method(http::Method::POST)
|
|
||||||
// .header(http::header::CONTENT_ENCODING, "deflate")
|
|
||||||
// .with_connector(conn)
|
|
||||||
// .body(enc)
|
|
||||||
// .unwrap();
|
|
||||||
// let response = rt.block_on(request.send()).unwrap();
|
|
||||||
// assert!(response.status().is_success());
|
|
||||||
|
|
||||||
// // read response
|
|
||||||
// let bytes = rt.block_on(response.body()).unwrap();
|
|
||||||
// assert_eq!(bytes.len(), data.len());
|
|
||||||
// assert_eq!(bytes, Bytes::from(data));
|
|
||||||
|
|
||||||
// let _ = sys.stop();
|
|
||||||
// }
|
|
||||||
|
|
||||||
// #[test]
|
// #[test]
|
||||||
// fn test_server_cookies() {
|
// fn test_server_cookies() {
|
||||||
// use actix_web::http;
|
// use actix_web::http;
|
||||||
|
Reference in New Issue
Block a user