1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-07-20 08:09:56 +02:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Nikolay Kim
e708f51156 prep release 2018-03-04 20:28:06 -08:00
Nikolay Kim
cbb821148b explicitly set tcp nodelay 2018-03-04 20:14:58 -08:00
Nikolay Kim
d6b021e185 Merge pull request #94 from messense/feature/str-repeat
Use str::repeat
2018-03-04 19:57:49 -08:00
messense
0adb7e8553 Use str::repeat 2018-03-05 09:54:58 +08:00
Nikolay Kim
dbfa1f0ac8 fix example 2018-03-04 10:44:41 -08:00
Nikolay Kim
11347e3c7d Allow to use Arc<Vec<u8>> as response/request body 2018-03-04 10:33:18 -08:00
Nikolay Kim
631fe72a46 websockets text() is more generic 2018-03-04 10:18:42 -08:00
Nikolay Kim
f673dba759 Fix handling of requests with an encoded body with a length > 8192 #93 2018-03-04 09:48:34 -08:00
11 changed files with 417 additions and 212 deletions

View File

@@ -1,5 +1,11 @@
# Changes # Changes
## 0.4.4 (2018-03-04)
* Allow to use Arc<Vec<u8>> as response/request body
* Fix handling of requests with an encoded body with a length > 8192 #93
## 0.4.3 (2018-03-03) ## 0.4.3 (2018-03-03)
* Fix request body read bug * Fix request body read bug

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "actix-web" name = "actix-web"
version = "0.4.3" version = "0.4.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust." description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
readme = "README.md" readme = "README.md"

View File

@@ -88,7 +88,7 @@ impl Handler<ClientCommand> for ChatClient {
type Result = (); type Result = ();
fn handle(&mut self, msg: ClientCommand, ctx: &mut Context<Self>) { fn handle(&mut self, msg: ClientCommand, ctx: &mut Context<Self>) {
self.0.text(msg.0.as_str()) self.0.text(msg.0)
} }
} }

View File

@@ -36,6 +36,8 @@ pub enum Binary {
/// Shared string body /// Shared string body
#[doc(hidden)] #[doc(hidden)]
ArcSharedString(Arc<String>), ArcSharedString(Arc<String>),
/// Shared vec body
SharedVec(Arc<Vec<u8>>),
} }
impl Body { impl Body {
@@ -115,6 +117,7 @@ impl Binary {
Binary::Slice(slice) => slice.len(), Binary::Slice(slice) => slice.len(),
Binary::SharedString(ref s) => s.len(), Binary::SharedString(ref s) => s.len(),
Binary::ArcSharedString(ref s) => s.len(), Binary::ArcSharedString(ref s) => s.len(),
Binary::SharedVec(ref s) => s.len(),
} }
} }
@@ -134,8 +137,9 @@ impl Clone for Binary {
match *self { match *self {
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()), Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)), Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())), Binary::SharedString(ref s) => Binary::SharedString(s.clone()),
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())), Binary::ArcSharedString(ref s) => Binary::ArcSharedString(s.clone()),
Binary::SharedVec(ref s) => Binary::SharedVec(s.clone()),
} }
} }
} }
@@ -147,6 +151,7 @@ impl Into<Bytes> for Binary {
Binary::Slice(slice) => Bytes::from(slice), Binary::Slice(slice) => Bytes::from(slice),
Binary::SharedString(s) => Bytes::from(s.as_str()), Binary::SharedString(s) => Bytes::from(s.as_str()),
Binary::ArcSharedString(s) => Bytes::from(s.as_str()), Binary::ArcSharedString(s) => Bytes::from(s.as_str()),
Binary::SharedVec(s) => Bytes::from(AsRef::<[u8]>::as_ref(s.as_ref())),
} }
} }
} }
@@ -217,6 +222,18 @@ impl<'a> From<&'a Arc<String>> for Binary {
} }
} }
impl From<Arc<Vec<u8>>> for Binary {
fn from(body: Arc<Vec<u8>>) -> Binary {
Binary::SharedVec(body)
}
}
impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
fn from(body: &'a Arc<Vec<u8>>) -> Binary {
Binary::SharedVec(Arc::clone(body))
}
}
impl AsRef<[u8]> for Binary { impl AsRef<[u8]> for Binary {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
match *self { match *self {
@@ -224,6 +241,7 @@ impl AsRef<[u8]> for Binary {
Binary::Slice(slice) => slice, Binary::Slice(slice) => slice,
Binary::SharedString(ref s) => s.as_bytes(), Binary::SharedString(ref s) => s.as_bytes(),
Binary::ArcSharedString(ref s) => s.as_bytes(), Binary::ArcSharedString(ref s) => s.as_bytes(),
Binary::SharedVec(ref s) => s.as_ref().as_ref(),
} }
} }
} }
@@ -304,6 +322,15 @@ mod tests {
assert_eq!(Binary::from(&b).as_ref(), "test".as_bytes()); assert_eq!(Binary::from(&b).as_ref(), "test".as_bytes());
} }
#[test]
fn test_shared_vec() {
let b = Arc::new(Vec::from(&b"test"[..]));
assert_eq!(Binary::from(b.clone()).len(), 4);
assert_eq!(Binary::from(b.clone()).as_ref(), &b"test"[..]);
assert_eq!(Binary::from(&b).len(), 4);
assert_eq!(Binary::from(&b).as_ref(), &b"test"[..]);
}
#[test] #[test]
fn test_bytes_mut() { fn test_bytes_mut() {
let b = BytesMut::from("test"); let b = BytesMut::from("test");

View File

@@ -32,9 +32,10 @@ pub struct HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static {
impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
{ {
pub(crate) fn new(settings: Rc<WorkerSettings<H>>, pub(crate) fn new(settings: Rc<WorkerSettings<H>>,
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H> mut io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
{ {
settings.add_channel(); settings.add_channel();
let _ = io.set_nodelay(true);
if http2 { if http2 {
HttpChannel { HttpChannel {

View File

@@ -11,7 +11,7 @@ use flate2::Compression;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder}; use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
use brotli2::write::{BrotliDecoder, BrotliEncoder}; use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut, Writer}; use bytes::{Bytes, BytesMut, BufMut};
use headers::ContentEncoding; use headers::ContentEncoding;
use body::{Body, Binary}; use body::{Body, Binary};
@@ -128,177 +128,6 @@ impl PayloadWriter for PayloadType {
} }
} }
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer<BytesMut>>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
Br(Box<BrotliDecoder<Writer<BytesMut>>>),
Identity,
}
// should go after write::GzDecoder get implemented
#[derive(Debug)]
pub(crate) struct Wrapper {
pub buf: BytesMut,
pub eof: bool,
}
impl io::Read for Wrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(buf.len(), self.buf.len());
buf[..len].copy_from_slice(&self.buf[..len]);
self.buf.split_to(len);
if len == 0 {
if self.eof {
Ok(0)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
} else {
Ok(len)
}
}
}
impl io::Write for Wrapper {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Payload stream with decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Deflate => Decoder::Deflate(
Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))),
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream{ decoder: dec, dst: BytesMut::new() }
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
let b = writer.get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
loop {
self.dst.reserve(8192);
match decoder.read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.take().freeze()))
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(err) => return Err(err),
}
}
} else {
Ok(None)
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err),
}
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(err) => Err(err)
}
},
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(
Box::new(GzDecoder::new(
Wrapper{buf: BytesMut::from(data), eof: false})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.split_to(n).freeze()));
} else {
unsafe{self.dst.set_len(n)};
}
}
Err(e) => return Err(e),
}
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().get_mut().take().freeze();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(Some(data)),
}
}
}
/// Payload wrapper with content decompression support /// Payload wrapper with content decompression support
pub(crate) struct EncodedPayload { pub(crate) struct EncodedPayload {
@@ -357,6 +186,203 @@ impl PayloadWriter for EncodedPayload {
} }
} }
pub(crate) enum Decoder {
Deflate(Box<DeflateDecoder<Writer>>),
Gzip(Option<Box<GzDecoder<Wrapper>>>),
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
// should go after write::GzDecoder get implemented
#[derive(Debug)]
pub(crate) struct Wrapper {
pub buf: BytesMut,
pub eof: bool,
}
impl io::Read for Wrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(buf.len(), self.buf.len());
buf[..len].copy_from_slice(&self.buf[..len]);
self.buf.split_to(len);
if len == 0 {
if self.eof {
Ok(0)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
} else {
Ok(len)
}
}
}
impl io::Write for Wrapper {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub(crate) struct Writer {
buf: BytesMut,
}
impl Writer {
fn new() -> Writer {
Writer{buf: BytesMut::with_capacity(8192)}
}
fn take(&mut self) -> Bytes {
self.buf.take().freeze()
}
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Payload stream with decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
ContentEncoding::Br => Decoder::Br(
Box::new(BrotliDecoder::new(Writer::new()))),
ContentEncoding::Deflate => Decoder::Deflate(
Box::new(DeflateDecoder::new(Writer::new()))),
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream{ decoder: dec, dst: BytesMut::new() }
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.finish() {
Ok(mut writer) => {
let b = writer.take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
loop {
self.dst.reserve(8192);
match decoder.read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.take().freeze()))
} else {
unsafe{self.dst.advance_mut(n)};
}
}
Err(e) => return Err(e),
}
}
} else {
Ok(None)
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
Decoder::Br(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e)
}
},
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(
Box::new(GzDecoder::new(
Wrapper{buf: BytesMut::from(data), eof: false})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
Ok(n) => {
if n == 0 {
return Ok(Some(self.dst.take().freeze()));
} else {
unsafe{self.dst.advance_mut(n)};
}
}
Err(e) => {
return Err(e)
}
}
}
},
Decoder::Deflate(ref mut decoder) => {
match decoder.write(&data).and_then(|_| decoder.flush()) {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
},
Err(e) => Err(e),
}
},
Decoder::Identity => Ok(Some(data)),
}
}
}
pub(crate) enum ContentEncoder { pub(crate) enum ContentEncoder {
Deflate(DeflateEncoder<TransferEncoding>), Deflate(DeflateEncoder<TransferEncoding>),
Gzip(GzEncoder<TransferEncoding>), Gzip(GzEncoder<TransferEncoding>),

View File

@@ -489,7 +489,7 @@ impl ClientWriter {
/// Send text frame /// Send text frame
#[inline] #[inline]
pub fn text<T: Into<String>>(&mut self, text: T) { pub fn text<T: Into<Binary>>(&mut self, text: T) {
self.write(Frame::message(text.into(), OpCode::Text, true, true)); self.write(Frame::message(text.into(), OpCode::Text, true, true));
} }

View File

@@ -132,7 +132,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
/// Send text frame /// Send text frame
#[inline] #[inline]
pub fn text<T: Into<String>>(&mut self, text: T) { pub fn text<T: Into<Binary>>(&mut self, text: T) {
self.write(Frame::message(text.into(), OpCode::Text, true, false)); self.write(Frame::message(text.into(), OpCode::Text, true, false));
} }

View File

@@ -116,6 +116,32 @@ fn test_client_gzip_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_client_gzip_encoding_large() {
let data = STR.repeat(10);
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Deflate)
.body(bytes))
}).responder()}
));
// client request
let request = srv.post()
.content_encoding(headers::ContentEncoding::Gzip)
.body(data.clone()).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from(data));
}
#[test] #[test]
fn test_client_brotli_encoding() { fn test_client_brotli_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {

View File

@@ -136,27 +136,32 @@ fn test_simple() {
#[test] #[test]
fn test_headers() { fn test_headers() {
let data = STR.repeat(10);
let srv_data = Arc::new(data.clone());
let mut srv = test::TestServer::new( let mut srv = test::TestServer::new(
|app| app.handler(|_| { move |app| {
let mut builder = httpcodes::HTTPOk.build(); let data = srv_data.clone();
for idx in 0..90 { app.handler(move |_| {
builder.header( let mut builder = httpcodes::HTTPOk.build();
format!("X-TEST-{}", idx).as_str(), for idx in 0..90 {
"TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ builder.header(
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ format!("X-TEST-{}", idx).as_str(),
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ "TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \ TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST "); TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
} TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
builder.body(STR)})); TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ");
}
builder.body(data.as_ref())})
});
let request = srv.get().finish().unwrap(); let request = srv.get().finish().unwrap();
let response = srv.execute(request.send()).unwrap(); let response = srv.execute(request.send()).unwrap();
@@ -164,7 +169,7 @@ fn test_headers() {
// read response // read response
let bytes = srv.execute(response.body()).unwrap(); let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from(data));
} }
#[test] #[test]
@@ -203,6 +208,33 @@ fn test_body_gzip() {
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref())); assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_body_gzip_large() {
let data = STR.repeat(10);
let srv_data = Arc::new(data.clone());
let mut srv = test::TestServer::new(
move |app| {
let data = srv_data.clone();
app.handler(
move |_| httpcodes::HTTPOk.build()
.content_encoding(headers::ContentEncoding::Gzip)
.body(data.as_ref()))});
let request = srv.get().disable_decompress().finish().unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
// decode
let mut e = GzDecoder::new(&bytes[..]);
let mut dec = Vec::new();
e.read_to_end(&mut dec).unwrap();
assert_eq!(Bytes::from(dec), Bytes::from(data));
}
#[test] #[test]
fn test_body_chunked_implicit() { fn test_body_chunked_implicit() {
let mut srv = test::TestServer::new( let mut srv = test::TestServer::new(
@@ -430,6 +462,35 @@ fn test_gzip_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_gzip_encoding_large() {
let data = STR.repeat(10);
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(bytes))
}).responder()}
));
// client request
let mut e = GzEncoder::new(Vec::new(), Compression::default());
e.write_all(data.as_ref()).unwrap();
let enc = e.finish().unwrap();
let request = srv.post()
.header(header::CONTENT_ENCODING, "gzip")
.body(enc.clone()).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from(data));
}
#[test] #[test]
fn test_deflate_encoding() { fn test_deflate_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
@@ -458,6 +519,35 @@ fn test_deflate_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_deflate_encoding_large() {
let data = STR.repeat(10);
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(bytes))
}).responder()}
));
let mut e = DeflateEncoder::new(Vec::new(), Compression::default());
e.write_all(data.as_ref()).unwrap();
let enc = e.finish().unwrap();
// client request
let request = srv.post()
.header(header::CONTENT_ENCODING, "deflate")
.body(enc).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from(data));
}
#[test] #[test]
fn test_brotli_encoding() { fn test_brotli_encoding() {
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| { let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
@@ -486,6 +576,35 @@ fn test_brotli_encoding() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_brotli_encoding_large() {
let data = STR.repeat(10);
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
req.body()
.and_then(|bytes: Bytes| {
Ok(httpcodes::HTTPOk
.build()
.content_encoding(headers::ContentEncoding::Identity)
.body(bytes))
}).responder()}
));
let mut e = BrotliEncoder::new(Vec::new(), 5);
e.write_all(data.as_ref()).unwrap();
let enc = e.finish().unwrap();
// client request
let request = srv.post()
.header(header::CONTENT_ENCODING, "br")
.body(enc).unwrap();
let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success());
// read response
let bytes = srv.execute(response.body()).unwrap();
assert_eq!(bytes, Bytes::from(data));
}
#[test] #[test]
fn test_h2() { fn test_h2() {
let srv = test::TestServer::new(|app| app.handler(|_|{ let srv = test::TestServer::new(|app| app.handler(|_|{

View File

@@ -19,7 +19,7 @@ use futures::Future;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use actix::prelude::*; use actix::prelude::*;
use actix_web::ws::{Message, WsClientError, WsClient, WsClientWriter}; use actix_web::ws;
fn main() { fn main() {
@@ -71,21 +71,21 @@ fn main() {
let perf = perf_counters.clone(); let perf = perf_counters.clone();
let addr = Arbiter::new(format!("test {}", t)); let addr = Arbiter::new(format!("test {}", t));
addr.send(actix::msgs::Execute::new(move || -> Result<(), ()> { addr.do_send(actix::msgs::Execute::new(move || -> Result<(), ()> {
let mut reps = report; let mut reps = report;
for _ in 0..concurrency { for _ in 0..concurrency {
let pl2 = pl.clone(); let pl2 = pl.clone();
let perf2 = perf.clone(); let perf2 = perf.clone();
Arbiter::handle().spawn( Arbiter::handle().spawn(
WsClient::new(&ws).connect().unwrap() ws::Client::new(&ws).connect()
.map_err(|e| { .map_err(|e| {
println!("Error: {}", e); println!("Error: {}", e);
Arbiter::system().send(actix::msgs::SystemExit(0)); Arbiter::system().do_send(actix::msgs::SystemExit(0));
() ()
}) })
.map(move |(reader, writer)| { .map(move |(reader, writer)| {
let addr: SyncAddress<_> = ChatClient::create(move |ctx| { let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
ChatClient::add_stream(reader, ctx); ChatClient::add_stream(reader, ctx);
ChatClient{conn: writer, ChatClient{conn: writer,
payload: pl2, payload: pl2,
@@ -114,7 +114,7 @@ fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
} }
struct ChatClient{ struct ChatClient{
conn: WsClientWriter, conn: ws::ClientWriter,
payload: Arc<String>, payload: Arc<String>,
ts: u64, ts: u64,
bin: bool, bin: bool,
@@ -133,9 +133,9 @@ impl Actor for ChatClient {
} }
} }
fn stopping(&mut self, _: &mut Context<Self>) -> bool { fn stopping(&mut self, _: &mut Context<Self>) -> Running {
Arbiter::system().send(actix::msgs::SystemExit(0)); Arbiter::system().do_send(actix::msgs::SystemExit(0));
true Running::Stop
} }
} }
@@ -171,15 +171,15 @@ impl ChatClient {
} }
/// Handle server websocket messages /// Handle server websocket messages
impl StreamHandler<Message, WsClientError> for ChatClient { impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
fn finished(&mut self, ctx: &mut Context<Self>) { fn finished(&mut self, ctx: &mut Context<Self>) {
ctx.stop() ctx.stop()
} }
fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) { fn handle(&mut self, msg: ws::Message, ctx: &mut Context<Self>) {
match msg { match msg {
Message::Text(txt) => { ws::Message::Text(txt) => {
if txt == self.payload.as_ref().as_str() { if txt == self.payload.as_ref().as_str() {
self.perf_counters.register_request(); self.perf_counters.register_request();
self.perf_counters.register_latency(time::precise_time_ns() - self.ts); self.perf_counters.register_latency(time::precise_time_ns() - self.ts);