mirror of
https://github.com/fafhrd91/actix-web
synced 2025-07-20 08:09:56 +02:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
e708f51156 | ||
|
cbb821148b | ||
|
d6b021e185 | ||
|
0adb7e8553 | ||
|
dbfa1f0ac8 | ||
|
11347e3c7d | ||
|
631fe72a46 | ||
|
f673dba759 |
@@ -1,5 +1,11 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
|
## 0.4.4 (2018-03-04)
|
||||||
|
|
||||||
|
* Allow to use Arc<Vec<u8>> as response/request body
|
||||||
|
|
||||||
|
* Fix handling of requests with an encoded body with a length > 8192 #93
|
||||||
|
|
||||||
## 0.4.3 (2018-03-03)
|
## 0.4.3 (2018-03-03)
|
||||||
|
|
||||||
* Fix request body read bug
|
* Fix request body read bug
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-web"
|
name = "actix-web"
|
||||||
version = "0.4.3"
|
version = "0.4.4"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
|
description = "Actix web is a small, pragmatic, extremely fast, web framework for Rust."
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
|
@@ -88,7 +88,7 @@ impl Handler<ClientCommand> for ChatClient {
|
|||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
fn handle(&mut self, msg: ClientCommand, ctx: &mut Context<Self>) {
|
fn handle(&mut self, msg: ClientCommand, ctx: &mut Context<Self>) {
|
||||||
self.0.text(msg.0.as_str())
|
self.0.text(msg.0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
31
src/body.rs
31
src/body.rs
@@ -36,6 +36,8 @@ pub enum Binary {
|
|||||||
/// Shared string body
|
/// Shared string body
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
ArcSharedString(Arc<String>),
|
ArcSharedString(Arc<String>),
|
||||||
|
/// Shared vec body
|
||||||
|
SharedVec(Arc<Vec<u8>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Body {
|
impl Body {
|
||||||
@@ -115,6 +117,7 @@ impl Binary {
|
|||||||
Binary::Slice(slice) => slice.len(),
|
Binary::Slice(slice) => slice.len(),
|
||||||
Binary::SharedString(ref s) => s.len(),
|
Binary::SharedString(ref s) => s.len(),
|
||||||
Binary::ArcSharedString(ref s) => s.len(),
|
Binary::ArcSharedString(ref s) => s.len(),
|
||||||
|
Binary::SharedVec(ref s) => s.len(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,8 +137,9 @@ impl Clone for Binary {
|
|||||||
match *self {
|
match *self {
|
||||||
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
|
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
|
||||||
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
|
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
|
||||||
Binary::SharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
Binary::SharedString(ref s) => Binary::SharedString(s.clone()),
|
||||||
Binary::ArcSharedString(ref s) => Binary::Bytes(Bytes::from(s.as_str())),
|
Binary::ArcSharedString(ref s) => Binary::ArcSharedString(s.clone()),
|
||||||
|
Binary::SharedVec(ref s) => Binary::SharedVec(s.clone()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -147,6 +151,7 @@ impl Into<Bytes> for Binary {
|
|||||||
Binary::Slice(slice) => Bytes::from(slice),
|
Binary::Slice(slice) => Bytes::from(slice),
|
||||||
Binary::SharedString(s) => Bytes::from(s.as_str()),
|
Binary::SharedString(s) => Bytes::from(s.as_str()),
|
||||||
Binary::ArcSharedString(s) => Bytes::from(s.as_str()),
|
Binary::ArcSharedString(s) => Bytes::from(s.as_str()),
|
||||||
|
Binary::SharedVec(s) => Bytes::from(AsRef::<[u8]>::as_ref(s.as_ref())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -217,6 +222,18 @@ impl<'a> From<&'a Arc<String>> for Binary {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<Arc<Vec<u8>>> for Binary {
|
||||||
|
fn from(body: Arc<Vec<u8>>) -> Binary {
|
||||||
|
Binary::SharedVec(body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
|
||||||
|
fn from(body: &'a Arc<Vec<u8>>) -> Binary {
|
||||||
|
Binary::SharedVec(Arc::clone(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl AsRef<[u8]> for Binary {
|
impl AsRef<[u8]> for Binary {
|
||||||
fn as_ref(&self) -> &[u8] {
|
fn as_ref(&self) -> &[u8] {
|
||||||
match *self {
|
match *self {
|
||||||
@@ -224,6 +241,7 @@ impl AsRef<[u8]> for Binary {
|
|||||||
Binary::Slice(slice) => slice,
|
Binary::Slice(slice) => slice,
|
||||||
Binary::SharedString(ref s) => s.as_bytes(),
|
Binary::SharedString(ref s) => s.as_bytes(),
|
||||||
Binary::ArcSharedString(ref s) => s.as_bytes(),
|
Binary::ArcSharedString(ref s) => s.as_bytes(),
|
||||||
|
Binary::SharedVec(ref s) => s.as_ref().as_ref(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -304,6 +322,15 @@ mod tests {
|
|||||||
assert_eq!(Binary::from(&b).as_ref(), "test".as_bytes());
|
assert_eq!(Binary::from(&b).as_ref(), "test".as_bytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_shared_vec() {
|
||||||
|
let b = Arc::new(Vec::from(&b"test"[..]));
|
||||||
|
assert_eq!(Binary::from(b.clone()).len(), 4);
|
||||||
|
assert_eq!(Binary::from(b.clone()).as_ref(), &b"test"[..]);
|
||||||
|
assert_eq!(Binary::from(&b).len(), 4);
|
||||||
|
assert_eq!(Binary::from(&b).as_ref(), &b"test"[..]);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_bytes_mut() {
|
fn test_bytes_mut() {
|
||||||
let b = BytesMut::from("test");
|
let b = BytesMut::from("test");
|
||||||
|
@@ -32,9 +32,10 @@ pub struct HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static {
|
|||||||
impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
|
impl<T, H> HttpChannel<T, H> where T: IoStream, H: HttpHandler + 'static
|
||||||
{
|
{
|
||||||
pub(crate) fn new(settings: Rc<WorkerSettings<H>>,
|
pub(crate) fn new(settings: Rc<WorkerSettings<H>>,
|
||||||
io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
|
mut io: T, peer: Option<SocketAddr>, http2: bool) -> HttpChannel<T, H>
|
||||||
{
|
{
|
||||||
settings.add_channel();
|
settings.add_channel();
|
||||||
|
let _ = io.set_nodelay(true);
|
||||||
|
|
||||||
if http2 {
|
if http2 {
|
||||||
HttpChannel {
|
HttpChannel {
|
||||||
|
@@ -11,7 +11,7 @@ use flate2::Compression;
|
|||||||
use flate2::read::GzDecoder;
|
use flate2::read::GzDecoder;
|
||||||
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
|
use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
|
||||||
use brotli2::write::{BrotliDecoder, BrotliEncoder};
|
use brotli2::write::{BrotliDecoder, BrotliEncoder};
|
||||||
use bytes::{Bytes, BytesMut, BufMut, Writer};
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
|
|
||||||
use headers::ContentEncoding;
|
use headers::ContentEncoding;
|
||||||
use body::{Body, Binary};
|
use body::{Body, Binary};
|
||||||
@@ -128,177 +128,6 @@ impl PayloadWriter for PayloadType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum Decoder {
|
|
||||||
Deflate(Box<DeflateDecoder<Writer<BytesMut>>>),
|
|
||||||
Gzip(Option<Box<GzDecoder<Wrapper>>>),
|
|
||||||
Br(Box<BrotliDecoder<Writer<BytesMut>>>),
|
|
||||||
Identity,
|
|
||||||
}
|
|
||||||
|
|
||||||
// should go after write::GzDecoder get implemented
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) struct Wrapper {
|
|
||||||
pub buf: BytesMut,
|
|
||||||
pub eof: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl io::Read for Wrapper {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
||||||
let len = cmp::min(buf.len(), self.buf.len());
|
|
||||||
buf[..len].copy_from_slice(&self.buf[..len]);
|
|
||||||
self.buf.split_to(len);
|
|
||||||
if len == 0 {
|
|
||||||
if self.eof {
|
|
||||||
Ok(0)
|
|
||||||
} else {
|
|
||||||
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(len)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl io::Write for Wrapper {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
||||||
self.buf.extend_from_slice(buf);
|
|
||||||
Ok(buf.len())
|
|
||||||
}
|
|
||||||
fn flush(&mut self) -> io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Payload stream with decompression support
|
|
||||||
pub(crate) struct PayloadStream {
|
|
||||||
decoder: Decoder,
|
|
||||||
dst: BytesMut,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PayloadStream {
|
|
||||||
pub fn new(enc: ContentEncoding) -> PayloadStream {
|
|
||||||
let dec = match enc {
|
|
||||||
ContentEncoding::Br => Decoder::Br(
|
|
||||||
Box::new(BrotliDecoder::new(BytesMut::with_capacity(8192).writer()))),
|
|
||||||
ContentEncoding::Deflate => Decoder::Deflate(
|
|
||||||
Box::new(DeflateDecoder::new(BytesMut::with_capacity(8192).writer()))),
|
|
||||||
ContentEncoding::Gzip => Decoder::Gzip(None),
|
|
||||||
_ => Decoder::Identity,
|
|
||||||
};
|
|
||||||
PayloadStream{ decoder: dec, dst: BytesMut::new() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PayloadStream {
|
|
||||||
|
|
||||||
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
|
|
||||||
match self.decoder {
|
|
||||||
Decoder::Br(ref mut decoder) => {
|
|
||||||
match decoder.finish() {
|
|
||||||
Ok(mut writer) => {
|
|
||||||
let b = writer.get_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
Ok(Some(b))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Gzip(ref mut decoder) => {
|
|
||||||
if let Some(ref mut decoder) = *decoder {
|
|
||||||
decoder.as_mut().get_mut().eof = true;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
self.dst.reserve(8192);
|
|
||||||
match decoder.read(unsafe{self.dst.bytes_mut()}) {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
return Ok(Some(self.dst.take().freeze()))
|
|
||||||
} else {
|
|
||||||
unsafe{self.dst.set_len(n)};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Deflate(ref mut decoder) => {
|
|
||||||
match decoder.try_finish() {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = decoder.get_mut().get_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
Ok(Some(b))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Identity => Ok(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
|
|
||||||
match self.decoder {
|
|
||||||
Decoder::Br(ref mut decoder) => {
|
|
||||||
match decoder.write(&data).and_then(|_| decoder.flush()) {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = decoder.get_mut().get_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
Ok(Some(b))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => Err(err)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Gzip(ref mut decoder) => {
|
|
||||||
if decoder.is_none() {
|
|
||||||
*decoder = Some(
|
|
||||||
Box::new(GzDecoder::new(
|
|
||||||
Wrapper{buf: BytesMut::from(data), eof: false})));
|
|
||||||
} else {
|
|
||||||
let _ = decoder.as_mut().unwrap().write(&data);
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
self.dst.reserve(8192);
|
|
||||||
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
return Ok(Some(self.dst.split_to(n).freeze()));
|
|
||||||
} else {
|
|
||||||
unsafe{self.dst.set_len(n)};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Deflate(ref mut decoder) => {
|
|
||||||
match decoder.write(&data).and_then(|_| decoder.flush()) {
|
|
||||||
Ok(_) => {
|
|
||||||
let b = decoder.get_mut().get_mut().take().freeze();
|
|
||||||
if !b.is_empty() {
|
|
||||||
Ok(Some(b))
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(e) => Err(e),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Decoder::Identity => Ok(Some(data)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Payload wrapper with content decompression support
|
/// Payload wrapper with content decompression support
|
||||||
pub(crate) struct EncodedPayload {
|
pub(crate) struct EncodedPayload {
|
||||||
@@ -357,6 +186,203 @@ impl PayloadWriter for EncodedPayload {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) enum Decoder {
|
||||||
|
Deflate(Box<DeflateDecoder<Writer>>),
|
||||||
|
Gzip(Option<Box<GzDecoder<Wrapper>>>),
|
||||||
|
Br(Box<BrotliDecoder<Writer>>),
|
||||||
|
Identity,
|
||||||
|
}
|
||||||
|
|
||||||
|
// should go after write::GzDecoder get implemented
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(crate) struct Wrapper {
|
||||||
|
pub buf: BytesMut,
|
||||||
|
pub eof: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Read for Wrapper {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
|
let len = cmp::min(buf.len(), self.buf.len());
|
||||||
|
buf[..len].copy_from_slice(&self.buf[..len]);
|
||||||
|
self.buf.split_to(len);
|
||||||
|
if len == 0 {
|
||||||
|
if self.eof {
|
||||||
|
Ok(0)
|
||||||
|
} else {
|
||||||
|
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(len)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for Wrapper {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.buf.extend_from_slice(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct Writer {
|
||||||
|
buf: BytesMut,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Writer {
|
||||||
|
fn new() -> Writer {
|
||||||
|
Writer{buf: BytesMut::with_capacity(8192)}
|
||||||
|
}
|
||||||
|
fn take(&mut self) -> Bytes {
|
||||||
|
self.buf.take().freeze()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl io::Write for Writer {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
|
self.buf.extend_from_slice(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
fn flush(&mut self) -> io::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Payload stream with decompression support
|
||||||
|
pub(crate) struct PayloadStream {
|
||||||
|
decoder: Decoder,
|
||||||
|
dst: BytesMut,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadStream {
|
||||||
|
pub fn new(enc: ContentEncoding) -> PayloadStream {
|
||||||
|
let dec = match enc {
|
||||||
|
ContentEncoding::Br => Decoder::Br(
|
||||||
|
Box::new(BrotliDecoder::new(Writer::new()))),
|
||||||
|
ContentEncoding::Deflate => Decoder::Deflate(
|
||||||
|
Box::new(DeflateDecoder::new(Writer::new()))),
|
||||||
|
ContentEncoding::Gzip => Decoder::Gzip(None),
|
||||||
|
_ => Decoder::Identity,
|
||||||
|
};
|
||||||
|
PayloadStream{ decoder: dec, dst: BytesMut::new() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PayloadStream {
|
||||||
|
|
||||||
|
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
|
||||||
|
match self.decoder {
|
||||||
|
Decoder::Br(ref mut decoder) => {
|
||||||
|
match decoder.finish() {
|
||||||
|
Ok(mut writer) => {
|
||||||
|
let b = writer.take();
|
||||||
|
if !b.is_empty() {
|
||||||
|
Ok(Some(b))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Gzip(ref mut decoder) => {
|
||||||
|
if let Some(ref mut decoder) = *decoder {
|
||||||
|
decoder.as_mut().get_mut().eof = true;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
self.dst.reserve(8192);
|
||||||
|
match decoder.read(unsafe{self.dst.bytes_mut()}) {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
return Ok(Some(self.dst.take().freeze()))
|
||||||
|
} else {
|
||||||
|
unsafe{self.dst.advance_mut(n)};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Deflate(ref mut decoder) => {
|
||||||
|
match decoder.try_finish() {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = decoder.get_mut().take();
|
||||||
|
if !b.is_empty() {
|
||||||
|
Ok(Some(b))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Identity => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
|
||||||
|
match self.decoder {
|
||||||
|
Decoder::Br(ref mut decoder) => {
|
||||||
|
match decoder.write(&data).and_then(|_| decoder.flush()) {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = decoder.get_mut().take();
|
||||||
|
if !b.is_empty() {
|
||||||
|
Ok(Some(b))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Gzip(ref mut decoder) => {
|
||||||
|
if decoder.is_none() {
|
||||||
|
*decoder = Some(
|
||||||
|
Box::new(GzDecoder::new(
|
||||||
|
Wrapper{buf: BytesMut::from(data), eof: false})));
|
||||||
|
} else {
|
||||||
|
let _ = decoder.as_mut().unwrap().write(&data);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
self.dst.reserve(8192);
|
||||||
|
match decoder.as_mut().as_mut().unwrap().read(unsafe{self.dst.bytes_mut()}) {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
return Ok(Some(self.dst.take().freeze()));
|
||||||
|
} else {
|
||||||
|
unsafe{self.dst.advance_mut(n)};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Deflate(ref mut decoder) => {
|
||||||
|
match decoder.write(&data).and_then(|_| decoder.flush()) {
|
||||||
|
Ok(_) => {
|
||||||
|
let b = decoder.get_mut().take();
|
||||||
|
if !b.is_empty() {
|
||||||
|
Ok(Some(b))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Decoder::Identity => Ok(Some(data)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) enum ContentEncoder {
|
pub(crate) enum ContentEncoder {
|
||||||
Deflate(DeflateEncoder<TransferEncoding>),
|
Deflate(DeflateEncoder<TransferEncoding>),
|
||||||
Gzip(GzEncoder<TransferEncoding>),
|
Gzip(GzEncoder<TransferEncoding>),
|
||||||
|
@@ -489,7 +489,7 @@ impl ClientWriter {
|
|||||||
|
|
||||||
/// Send text frame
|
/// Send text frame
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn text<T: Into<String>>(&mut self, text: T) {
|
pub fn text<T: Into<Binary>>(&mut self, text: T) {
|
||||||
self.write(Frame::message(text.into(), OpCode::Text, true, true));
|
self.write(Frame::message(text.into(), OpCode::Text, true, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -132,7 +132,7 @@ impl<A, S> WebsocketContext<A, S> where A: Actor<Context=Self> {
|
|||||||
|
|
||||||
/// Send text frame
|
/// Send text frame
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn text<T: Into<String>>(&mut self, text: T) {
|
pub fn text<T: Into<Binary>>(&mut self, text: T) {
|
||||||
self.write(Frame::message(text.into(), OpCode::Text, true, false));
|
self.write(Frame::message(text.into(), OpCode::Text, true, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -116,6 +116,32 @@ fn test_client_gzip_encoding() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_client_gzip_encoding_large() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
|
||||||
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
|
req.body()
|
||||||
|
.and_then(|bytes: Bytes| {
|
||||||
|
Ok(httpcodes::HTTPOk
|
||||||
|
.build()
|
||||||
|
.content_encoding(headers::ContentEncoding::Deflate)
|
||||||
|
.body(bytes))
|
||||||
|
}).responder()}
|
||||||
|
));
|
||||||
|
|
||||||
|
// client request
|
||||||
|
let request = srv.post()
|
||||||
|
.content_encoding(headers::ContentEncoding::Gzip)
|
||||||
|
.body(data.clone()).unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_client_brotli_encoding() {
|
fn test_client_brotli_encoding() {
|
||||||
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
|
@@ -136,27 +136,32 @@ fn test_simple() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_headers() {
|
fn test_headers() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
let srv_data = Arc::new(data.clone());
|
||||||
let mut srv = test::TestServer::new(
|
let mut srv = test::TestServer::new(
|
||||||
|app| app.handler(|_| {
|
move |app| {
|
||||||
let mut builder = httpcodes::HTTPOk.build();
|
let data = srv_data.clone();
|
||||||
for idx in 0..90 {
|
app.handler(move |_| {
|
||||||
builder.header(
|
let mut builder = httpcodes::HTTPOk.build();
|
||||||
format!("X-TEST-{}", idx).as_str(),
|
for idx in 0..90 {
|
||||||
"TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
builder.header(
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
format!("X-TEST-{}", idx).as_str(),
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
"TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ");
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
}
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST \
|
||||||
builder.body(STR)}));
|
TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST TEST ");
|
||||||
|
}
|
||||||
|
builder.body(data.as_ref())})
|
||||||
|
});
|
||||||
|
|
||||||
let request = srv.get().finish().unwrap();
|
let request = srv.get().finish().unwrap();
|
||||||
let response = srv.execute(request.send()).unwrap();
|
let response = srv.execute(request.send()).unwrap();
|
||||||
@@ -164,7 +169,7 @@ fn test_headers() {
|
|||||||
|
|
||||||
// read response
|
// read response
|
||||||
let bytes = srv.execute(response.body()).unwrap();
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -203,6 +208,33 @@ fn test_body_gzip() {
|
|||||||
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
assert_eq!(Bytes::from(dec), Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_body_gzip_large() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
let srv_data = Arc::new(data.clone());
|
||||||
|
|
||||||
|
let mut srv = test::TestServer::new(
|
||||||
|
move |app| {
|
||||||
|
let data = srv_data.clone();
|
||||||
|
app.handler(
|
||||||
|
move |_| httpcodes::HTTPOk.build()
|
||||||
|
.content_encoding(headers::ContentEncoding::Gzip)
|
||||||
|
.body(data.as_ref()))});
|
||||||
|
|
||||||
|
let request = srv.get().disable_decompress().finish().unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
|
|
||||||
|
// decode
|
||||||
|
let mut e = GzDecoder::new(&bytes[..]);
|
||||||
|
let mut dec = Vec::new();
|
||||||
|
e.read_to_end(&mut dec).unwrap();
|
||||||
|
assert_eq!(Bytes::from(dec), Bytes::from(data));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_body_chunked_implicit() {
|
fn test_body_chunked_implicit() {
|
||||||
let mut srv = test::TestServer::new(
|
let mut srv = test::TestServer::new(
|
||||||
@@ -430,6 +462,35 @@ fn test_gzip_encoding() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_gzip_encoding_large() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
|
req.body()
|
||||||
|
.and_then(|bytes: Bytes| {
|
||||||
|
Ok(httpcodes::HTTPOk
|
||||||
|
.build()
|
||||||
|
.content_encoding(headers::ContentEncoding::Identity)
|
||||||
|
.body(bytes))
|
||||||
|
}).responder()}
|
||||||
|
));
|
||||||
|
|
||||||
|
// client request
|
||||||
|
let mut e = GzEncoder::new(Vec::new(), Compression::default());
|
||||||
|
e.write_all(data.as_ref()).unwrap();
|
||||||
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
|
let request = srv.post()
|
||||||
|
.header(header::CONTENT_ENCODING, "gzip")
|
||||||
|
.body(enc.clone()).unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_deflate_encoding() {
|
fn test_deflate_encoding() {
|
||||||
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
@@ -458,6 +519,35 @@ fn test_deflate_encoding() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_deflate_encoding_large() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
|
req.body()
|
||||||
|
.and_then(|bytes: Bytes| {
|
||||||
|
Ok(httpcodes::HTTPOk
|
||||||
|
.build()
|
||||||
|
.content_encoding(headers::ContentEncoding::Identity)
|
||||||
|
.body(bytes))
|
||||||
|
}).responder()}
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut e = DeflateEncoder::new(Vec::new(), Compression::default());
|
||||||
|
e.write_all(data.as_ref()).unwrap();
|
||||||
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
|
// client request
|
||||||
|
let request = srv.post()
|
||||||
|
.header(header::CONTENT_ENCODING, "deflate")
|
||||||
|
.body(enc).unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_brotli_encoding() {
|
fn test_brotli_encoding() {
|
||||||
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
@@ -486,6 +576,35 @@ fn test_brotli_encoding() {
|
|||||||
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_brotli_encoding_large() {
|
||||||
|
let data = STR.repeat(10);
|
||||||
|
let mut srv = test::TestServer::new(|app| app.handler(|req: HttpRequest| {
|
||||||
|
req.body()
|
||||||
|
.and_then(|bytes: Bytes| {
|
||||||
|
Ok(httpcodes::HTTPOk
|
||||||
|
.build()
|
||||||
|
.content_encoding(headers::ContentEncoding::Identity)
|
||||||
|
.body(bytes))
|
||||||
|
}).responder()}
|
||||||
|
));
|
||||||
|
|
||||||
|
let mut e = BrotliEncoder::new(Vec::new(), 5);
|
||||||
|
e.write_all(data.as_ref()).unwrap();
|
||||||
|
let enc = e.finish().unwrap();
|
||||||
|
|
||||||
|
// client request
|
||||||
|
let request = srv.post()
|
||||||
|
.header(header::CONTENT_ENCODING, "br")
|
||||||
|
.body(enc).unwrap();
|
||||||
|
let response = srv.execute(request.send()).unwrap();
|
||||||
|
assert!(response.status().is_success());
|
||||||
|
|
||||||
|
// read response
|
||||||
|
let bytes = srv.execute(response.body()).unwrap();
|
||||||
|
assert_eq!(bytes, Bytes::from(data));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_h2() {
|
fn test_h2() {
|
||||||
let srv = test::TestServer::new(|app| app.handler(|_|{
|
let srv = test::TestServer::new(|app| app.handler(|_|{
|
||||||
|
@@ -19,7 +19,7 @@ use futures::Future;
|
|||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
|
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use actix_web::ws::{Message, WsClientError, WsClient, WsClientWriter};
|
use actix_web::ws;
|
||||||
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@@ -71,21 +71,21 @@ fn main() {
|
|||||||
let perf = perf_counters.clone();
|
let perf = perf_counters.clone();
|
||||||
let addr = Arbiter::new(format!("test {}", t));
|
let addr = Arbiter::new(format!("test {}", t));
|
||||||
|
|
||||||
addr.send(actix::msgs::Execute::new(move || -> Result<(), ()> {
|
addr.do_send(actix::msgs::Execute::new(move || -> Result<(), ()> {
|
||||||
let mut reps = report;
|
let mut reps = report;
|
||||||
for _ in 0..concurrency {
|
for _ in 0..concurrency {
|
||||||
let pl2 = pl.clone();
|
let pl2 = pl.clone();
|
||||||
let perf2 = perf.clone();
|
let perf2 = perf.clone();
|
||||||
|
|
||||||
Arbiter::handle().spawn(
|
Arbiter::handle().spawn(
|
||||||
WsClient::new(&ws).connect().unwrap()
|
ws::Client::new(&ws).connect()
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
println!("Error: {}", e);
|
println!("Error: {}", e);
|
||||||
Arbiter::system().send(actix::msgs::SystemExit(0));
|
Arbiter::system().do_send(actix::msgs::SystemExit(0));
|
||||||
()
|
()
|
||||||
})
|
})
|
||||||
.map(move |(reader, writer)| {
|
.map(move |(reader, writer)| {
|
||||||
let addr: SyncAddress<_> = ChatClient::create(move |ctx| {
|
let addr: Addr<Syn, _> = ChatClient::create(move |ctx| {
|
||||||
ChatClient::add_stream(reader, ctx);
|
ChatClient::add_stream(reader, ctx);
|
||||||
ChatClient{conn: writer,
|
ChatClient{conn: writer,
|
||||||
payload: pl2,
|
payload: pl2,
|
||||||
@@ -114,7 +114,7 @@ fn parse_u64_default(input: Option<&str>, default: u64) -> u64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ChatClient{
|
struct ChatClient{
|
||||||
conn: WsClientWriter,
|
conn: ws::ClientWriter,
|
||||||
payload: Arc<String>,
|
payload: Arc<String>,
|
||||||
ts: u64,
|
ts: u64,
|
||||||
bin: bool,
|
bin: bool,
|
||||||
@@ -133,9 +133,9 @@ impl Actor for ChatClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stopping(&mut self, _: &mut Context<Self>) -> bool {
|
fn stopping(&mut self, _: &mut Context<Self>) -> Running {
|
||||||
Arbiter::system().send(actix::msgs::SystemExit(0));
|
Arbiter::system().do_send(actix::msgs::SystemExit(0));
|
||||||
true
|
Running::Stop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -171,15 +171,15 @@ impl ChatClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Handle server websocket messages
|
/// Handle server websocket messages
|
||||||
impl StreamHandler<Message, WsClientError> for ChatClient {
|
impl StreamHandler<ws::Message, ws::ProtocolError> for ChatClient {
|
||||||
|
|
||||||
fn finished(&mut self, ctx: &mut Context<Self>) {
|
fn finished(&mut self, ctx: &mut Context<Self>) {
|
||||||
ctx.stop()
|
ctx.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle(&mut self, msg: Message, ctx: &mut Context<Self>) {
|
fn handle(&mut self, msg: ws::Message, ctx: &mut Context<Self>) {
|
||||||
match msg {
|
match msg {
|
||||||
Message::Text(txt) => {
|
ws::Message::Text(txt) => {
|
||||||
if txt == self.payload.as_ref().as_str() {
|
if txt == self.payload.as_ref().as_str() {
|
||||||
self.perf_counters.register_request();
|
self.perf_counters.register_request();
|
||||||
self.perf_counters.register_latency(time::precise_time_ns() - self.ts);
|
self.perf_counters.register_latency(time::precise_time_ns() - self.ts);
|
||||||
|
Reference in New Issue
Block a user