From 8fea1367c739224f6365185a6ec6b1c4efdd5a8f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 18 Nov 2018 13:48:42 -0800 Subject: [PATCH] re-introduce Body type, use Body as default body type for Response --- src/body.rs | 371 ++++++++++++++++++++--------------------- src/client/pipeline.rs | 4 +- src/client/request.rs | 9 +- src/error.rs | 5 +- src/h1/client.rs | 6 +- src/h1/codec.rs | 10 +- src/h1/dispatcher.rs | 18 +- src/h1/encoder.rs | 4 +- src/lib.rs | 5 +- src/response.rs | 149 ++++++++--------- src/service.rs | 4 +- src/test.rs | 7 +- src/ws/codec.rs | 5 +- src/ws/frame.rs | 5 +- tests/test_server.rs | 22 +-- tests/test_ws.rs | 20 +-- 16 files changed, 309 insertions(+), 335 deletions(-) diff --git a/src/body.rs b/src/body.rs index 3b4e0113..a44bc603 100644 --- a/src/body.rs +++ b/src/body.rs @@ -1,22 +1,19 @@ -use std::mem; -use std::sync::Arc; +use std::marker::PhantomData; +use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; use error::{Error, PayloadError}; -/// Type represent streaming body -pub type BodyStream = Box>; - /// Type represent streaming payload pub type PayloadStream = Box>; -#[derive(Debug)] +#[derive(Debug, PartialEq)] /// Different type of body pub enum BodyLength { None, - Zero, + Empty, Sized(usize), Sized64(u64), Chunked, @@ -32,7 +29,7 @@ pub trait MessageBody { impl MessageBody for () { fn length(&self) -> BodyLength { - BodyLength::Zero + BodyLength::Empty } fn poll_next(&mut self) -> Poll, Error> { @@ -40,150 +37,129 @@ impl MessageBody for () { } } -/// Represents various types of binary body. -/// `Content-Length` header is set to length of the body. -#[derive(Debug, PartialEq)] -pub enum Binary { - /// Bytes body +/// Represents various types of http message body. +pub enum Body { + /// Empty response. `Content-Length` header is not set. + None, + /// Zero sized response body. `Content-Length` header is set to `0`. + Empty, + /// Specific response body. Bytes(Bytes), - /// Static slice - Slice(&'static [u8]), - /// Shared string body - #[doc(hidden)] - SharedString(Arc), - /// Shared vec body - SharedVec(Arc>), + /// Generic message body. + Message(Box), } -impl Binary { - #[inline] - /// Returns `true` if body is empty - pub fn is_empty(&self) -> bool { - self.len() == 0 +impl Body { + /// Create body from slice (copy) + pub fn from_slice(s: &[u8]) -> Body { + Body::Bytes(Bytes::from(s)) } - #[inline] - /// Length of body in bytes - pub fn len(&self) -> usize { - match *self { - Binary::Bytes(ref bytes) => bytes.len(), - Binary::Slice(slice) => slice.len(), - Binary::SharedString(ref s) => s.len(), - Binary::SharedVec(ref s) => s.len(), - } - } - - /// Create binary body from slice - pub fn from_slice(s: &[u8]) -> Binary { - Binary::Bytes(Bytes::from(s)) - } - - /// Convert Binary to a Bytes instance - pub fn take(&mut self) -> Bytes { - mem::replace(self, Binary::Slice(b"")).into() + /// Create body from generic message body. + pub fn from_message(body: B) -> Body { + Body::Message(Box::new(body)) } } -impl Clone for Binary { - fn clone(&self) -> Binary { - match *self { - Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()), - Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)), - Binary::SharedString(ref s) => Binary::SharedString(s.clone()), - Binary::SharedVec(ref s) => Binary::SharedVec(s.clone()), - } - } -} - -impl Into for Binary { - fn into(self) -> Bytes { +impl MessageBody for Body { + fn length(&self) -> BodyLength { match self { - Binary::Bytes(bytes) => bytes, - Binary::Slice(slice) => Bytes::from(slice), - Binary::SharedString(s) => Bytes::from(s.as_str()), - Binary::SharedVec(s) => Bytes::from(AsRef::<[u8]>::as_ref(s.as_ref())), + Body::None => BodyLength::None, + Body::Empty => BodyLength::Empty, + Body::Bytes(ref bin) => BodyLength::Sized(bin.len()), + Body::Message(ref body) => body.length(), + } + } + + fn poll_next(&mut self) -> Poll, Error> { + match self { + Body::None => Ok(Async::Ready(None)), + Body::Empty => Ok(Async::Ready(None)), + Body::Bytes(ref mut bin) => { + if bin.len() == 0 { + Ok(Async::Ready(None)) + } else { + Ok(Async::Ready(Some(bin.slice_to(bin.len())))) + } + } + Body::Message(ref mut body) => body.poll_next(), } } } -impl From<&'static str> for Binary { - fn from(s: &'static str) -> Binary { - Binary::Slice(s.as_ref()) - } -} - -impl From<&'static [u8]> for Binary { - fn from(s: &'static [u8]) -> Binary { - Binary::Slice(s) - } -} - -impl From> for Binary { - fn from(vec: Vec) -> Binary { - Binary::Bytes(Bytes::from(vec)) - } -} - -impl From for Binary { - fn from(s: String) -> Binary { - Binary::Bytes(Bytes::from(s)) - } -} - -impl<'a> From<&'a String> for Binary { - fn from(s: &'a String) -> Binary { - Binary::Bytes(Bytes::from(AsRef::<[u8]>::as_ref(&s))) - } -} - -impl From for Binary { - fn from(s: Bytes) -> Binary { - Binary::Bytes(s) - } -} - -impl From for Binary { - fn from(s: BytesMut) -> Binary { - Binary::Bytes(s.freeze()) - } -} - -impl From> for Binary { - fn from(body: Arc) -> Binary { - Binary::SharedString(body) - } -} - -impl<'a> From<&'a Arc> for Binary { - fn from(body: &'a Arc) -> Binary { - Binary::SharedString(Arc::clone(body)) - } -} - -impl From>> for Binary { - fn from(body: Arc>) -> Binary { - Binary::SharedVec(body) - } -} - -impl<'a> From<&'a Arc>> for Binary { - fn from(body: &'a Arc>) -> Binary { - Binary::SharedVec(Arc::clone(body)) - } -} - -impl AsRef<[u8]> for Binary { - #[inline] - fn as_ref(&self) -> &[u8] { +impl PartialEq for Body { + fn eq(&self, other: &Body) -> bool { match *self { - Binary::Bytes(ref bytes) => bytes.as_ref(), - Binary::Slice(slice) => slice, - Binary::SharedString(ref s) => s.as_bytes(), - Binary::SharedVec(ref s) => s.as_ref().as_ref(), + Body::None => match *other { + Body::None => true, + _ => false, + }, + Body::Empty => match *other { + Body::Empty => true, + _ => false, + }, + Body::Bytes(ref b) => match *other { + Body::Bytes(ref b2) => b == b2, + _ => false, + }, + Body::Message(_) => false, } } } +impl fmt::Debug for Body { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Body::None => write!(f, "Body::None"), + Body::Empty => write!(f, "Body::Zero"), + Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b), + Body::Message(_) => write!(f, "Body::Message(_)"), + } + } +} + +impl From<&'static str> for Body { + fn from(s: &'static str) -> Body { + Body::Bytes(Bytes::from_static(s.as_ref())) + } +} + +impl From<&'static [u8]> for Body { + fn from(s: &'static [u8]) -> Body { + Body::Bytes(Bytes::from_static(s.as_ref())) + } +} + +impl From> for Body { + fn from(vec: Vec) -> Body { + Body::Bytes(Bytes::from(vec)) + } +} + +impl From for Body { + fn from(s: String) -> Body { + s.into_bytes().into() + } +} + +impl<'a> From<&'a String> for Body { + fn from(s: &'a String) -> Body { + Body::Bytes(Bytes::from(AsRef::<[u8]>::as_ref(&s))) + } +} + +impl From for Body { + fn from(s: Bytes) -> Body { + Body::Bytes(s) + } +} + +impl From for Body { + fn from(s: BytesMut) -> Body { + Body::Bytes(s.freeze()) + } +} + impl MessageBody for Bytes { fn length(&self) -> BodyLength { BodyLength::Sized(self.len()) @@ -279,26 +255,62 @@ impl MessageBody for String { } } -#[doc(hidden)] -pub struct MessageBodyStream { +/// Type represent streaming body. +/// Response does not contain `content-length` header and appropriate transfer encoding is used. +pub struct BodyStream { stream: S, + _t: PhantomData, } -impl MessageBodyStream +impl BodyStream where - S: Stream, + S: Stream, + E: Into, { pub fn new(stream: S) -> Self { - MessageBodyStream { stream } + BodyStream { + stream, + _t: PhantomData, + } } } -impl MessageBody for MessageBodyStream +impl MessageBody for BodyStream +where + S: Stream, + E: Into, +{ + fn length(&self) -> BodyLength { + BodyLength::Chunked + } + + fn poll_next(&mut self) -> Poll, Error> { + self.stream.poll().map_err(|e| e.into()) + } +} + +/// Type represent streaming body. This body implementation should be used +/// if total size of stream is known. Data get sent as is without using transfer encoding. +pub struct SizedStream { + size: usize, + stream: S, +} + +impl SizedStream +where + S: Stream, +{ + pub fn new(size: usize, stream: S) -> Self { + SizedStream { size, stream } + } +} + +impl MessageBody for SizedStream where S: Stream, { fn length(&self) -> BodyLength { - BodyLength::Chunked + BodyLength::Sized(self.size) } fn poll_next(&mut self) -> Poll, Error> { @@ -310,78 +322,61 @@ where mod tests { use super::*; - #[test] - fn test_is_empty() { - assert_eq!(Binary::from("").is_empty(), true); - assert_eq!(Binary::from("test").is_empty(), false); + impl Body { + pub(crate) fn get_ref(&self) -> &[u8] { + match *self { + Body::Bytes(ref bin) => &bin, + _ => panic!(), + } + } } #[test] fn test_static_str() { - assert_eq!(Binary::from("test").len(), 4); - assert_eq!(Binary::from("test").as_ref(), b"test"); + assert_eq!(Body::from("").length(), BodyLength::Sized(0)); + assert_eq!(Body::from("test").length(), BodyLength::Sized(4)); + assert_eq!(Body::from("test").get_ref(), b"test"); } #[test] fn test_static_bytes() { - assert_eq!(Binary::from(b"test".as_ref()).len(), 4); - assert_eq!(Binary::from(b"test".as_ref()).as_ref(), b"test"); - assert_eq!(Binary::from_slice(b"test".as_ref()).len(), 4); - assert_eq!(Binary::from_slice(b"test".as_ref()).as_ref(), b"test"); + assert_eq!(Body::from(b"test".as_ref()).length(), BodyLength::Sized(4)); + assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test"); + assert_eq!( + Body::from_slice(b"test".as_ref()).length(), + BodyLength::Sized(4) + ); + assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test"); } #[test] fn test_vec() { - assert_eq!(Binary::from(Vec::from("test")).len(), 4); - assert_eq!(Binary::from(Vec::from("test")).as_ref(), b"test"); + assert_eq!(Body::from(Vec::from("test")).length(), BodyLength::Sized(4)); + assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test"); } #[test] fn test_bytes() { - assert_eq!(Binary::from(Bytes::from("test")).len(), 4); - assert_eq!(Binary::from(Bytes::from("test")).as_ref(), b"test"); - } - - #[test] - fn test_arc_string() { - let b = Arc::new("test".to_owned()); - assert_eq!(Binary::from(b.clone()).len(), 4); - assert_eq!(Binary::from(b.clone()).as_ref(), b"test"); - assert_eq!(Binary::from(&b).len(), 4); - assert_eq!(Binary::from(&b).as_ref(), b"test"); + assert_eq!( + Body::from(Bytes::from("test")).length(), + BodyLength::Sized(4) + ); + assert_eq!(Body::from(Bytes::from("test")).get_ref(), b"test"); } #[test] fn test_string() { let b = "test".to_owned(); - assert_eq!(Binary::from(b.clone()).len(), 4); - assert_eq!(Binary::from(b.clone()).as_ref(), b"test"); - assert_eq!(Binary::from(&b).len(), 4); - assert_eq!(Binary::from(&b).as_ref(), b"test"); - } - - #[test] - fn test_shared_vec() { - let b = Arc::new(Vec::from(&b"test"[..])); - assert_eq!(Binary::from(b.clone()).len(), 4); - assert_eq!(Binary::from(b.clone()).as_ref(), &b"test"[..]); - assert_eq!(Binary::from(&b).len(), 4); - assert_eq!(Binary::from(&b).as_ref(), &b"test"[..]); + assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4)); + assert_eq!(Body::from(b.clone()).get_ref(), b"test"); + assert_eq!(Body::from(&b).length(), BodyLength::Sized(4)); + assert_eq!(Body::from(&b).get_ref(), b"test"); } #[test] fn test_bytes_mut() { let b = BytesMut::from("test"); - assert_eq!(Binary::from(b.clone()).len(), 4); - assert_eq!(Binary::from(b).as_ref(), b"test"); - } - - #[test] - fn test_binary_into() { - let bytes = Bytes::from_static(b"test"); - let b: Bytes = Binary::from("test").into(); - assert_eq!(b, bytes); - let b: Bytes = Binary::from(bytes.clone()).into(); - assert_eq!(b, bytes); + assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4)); + assert_eq!(Body::from(b).get_ref(), b"test"); } } diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 56c22bd2..8be860ae 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -36,7 +36,9 @@ where .and_then(|framed| framed.send((head, len).into()).from_err()) // send request body .and_then(move |framed| match body.length() { - BodyLength::None | BodyLength::Zero => Either::A(ok(framed)), + BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => { + Either::A(ok(framed)) + } _ => Either::B(SendBody::new(body, framed)), }) // read response and init read body diff --git a/src/client/request.rs b/src/client/request.rs index f0b76ed0..dd418a6f 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -9,7 +9,7 @@ use futures::{Future, Stream}; use percent_encoding::{percent_encode, USERINFO_ENCODE_SET}; use urlcrate::Url; -use body::{MessageBody, MessageBodyStream}; +use body::{BodyStream, MessageBody}; use error::Error; use header::{self, Header, IntoHeaderValue}; use http::{ @@ -534,14 +534,15 @@ impl ClientRequestBuilder { /// Set an streaming body and generate `ClientRequest`. /// /// `ClientRequestBuilder` can not be used after this call. - pub fn stream( + pub fn stream( &mut self, stream: S, ) -> Result, HttpError> where - S: Stream, + S: Stream, + E: Into + 'static, { - self.body(MessageBodyStream::new(stream)) + self.body(BodyStream::new(stream)) } /// Set an empty body and generate `ClientRequest`. diff --git a/src/error.rs b/src/error.rs index 1f70396c..2e0c2382 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,6 +20,7 @@ use tokio_timer::Error as TimerError; // re-exports pub use cookie::ParseError as CookieParseError; +use body::Body; use response::{Response, ResponseParts}; /// A specialized [`Result`](https://doc.rust-lang.org/std/result/enum.Result.html) @@ -100,10 +101,10 @@ impl Error { } /// Converts error to a response instance and set error message as response body - pub fn response_with_message(self) -> Response { + pub fn response_with_message(self) -> Response { let message = format!("{}", self); let resp: Response = self.into(); - resp.set_body(message) + resp.set_body(Body::from(message)) } } diff --git a/src/h1/client.rs b/src/h1/client.rs index ace50466..2cb2fb2e 100644 --- a/src/h1/client.rs +++ b/src/h1/client.rs @@ -7,7 +7,7 @@ use tokio_codec::{Decoder, Encoder}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; use super::encoder::RequestEncoder; use super::{Message, MessageType}; -use body::{Binary, BodyLength}; +use body::BodyLength; use client::ClientResponse; use config::ServiceConfig; use error::{ParseError, PayloadError}; @@ -167,7 +167,7 @@ impl ClientCodecInner { BodyLength::Chunked => { buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") } - BodyLength::Zero => { + BodyLength::Empty => { len_is_set = false; buffer.extend_from_slice(b"\r\n") } @@ -183,7 +183,7 @@ impl ClientCodecInner { TRANSFER_ENCODING => continue, CONTENT_LENGTH => match length { BodyLength::None => (), - BodyLength::Zero => len_is_set = true, + BodyLength::Empty => len_is_set = true, _ => continue, }, DATE => has_date = true, diff --git a/src/h1/codec.rs b/src/h1/codec.rs index 6bc20b18..b4a62a50 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -8,7 +8,7 @@ use tokio_codec::{Decoder, Encoder}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; use super::encoder::ResponseEncoder; use super::{Message, MessageType}; -use body::{Binary, BodyLength}; +use body::BodyLength; use config::ServiceConfig; use error::ParseError; use helpers; @@ -106,7 +106,7 @@ impl Codec { fn encode_response( &mut self, - mut msg: Response, + mut msg: Response<()>, buffer: &mut BytesMut, ) -> io::Result<()> { let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg @@ -149,7 +149,7 @@ impl Codec { BodyLength::Chunked => { buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") } - BodyLength::Zero => { + BodyLength::Empty => { len_is_set = false; buffer.extend_from_slice(b"\r\n") } @@ -174,7 +174,7 @@ impl Codec { TRANSFER_ENCODING => continue, CONTENT_LENGTH => match self.te.length { BodyLength::None => (), - BodyLength::Zero => { + BodyLength::Empty => { len_is_set = true; } _ => continue, @@ -268,7 +268,7 @@ impl Decoder for Codec { } impl Encoder for Codec { - type Item = Message; + type Item = Message>; type Error = io::Error; fn encode( diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 508962b4..ff9d54e6 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -65,7 +65,7 @@ where enum DispatcherMessage { Item(Request), - Error(Response), + Error(Response<()>), } enum State { @@ -190,7 +190,7 @@ where fn send_response( &mut self, - message: Response, + message: Response<()>, body: B1, ) -> Result, DispatchError> { self.framed @@ -206,7 +206,7 @@ where .set(Flags::KEEPALIVE, self.framed.get_codec().keepalive()); self.flags.remove(Flags::FLUSHED); match body.length() { - BodyLength::None | BodyLength::Zero => Ok(State::None), + BodyLength::None | BodyLength::Empty => Ok(State::None), _ => Ok(State::SendPayload(body)), } } @@ -351,7 +351,7 @@ where ); self.flags.insert(Flags::DISCONNECTED); self.messages.push_back(DispatcherMessage::Error( - Response::InternalServerError().finish(), + Response::InternalServerError().finish().drop_body(), )); self.error = Some(DispatchError::InternalError); break; @@ -364,7 +364,7 @@ where error!("Internal server error: unexpected eof"); self.flags.insert(Flags::DISCONNECTED); self.messages.push_back(DispatcherMessage::Error( - Response::InternalServerError().finish(), + Response::InternalServerError().finish().drop_body(), )); self.error = Some(DispatchError::InternalError); break; @@ -389,7 +389,7 @@ where // Malformed requests should be responded with 400 self.messages.push_back(DispatcherMessage::Error( - Response::BadRequest().finish(), + Response::BadRequest().finish().drop_body(), )); self.flags.insert(Flags::DISCONNECTED); self.error = Some(e.into()); @@ -440,8 +440,10 @@ where // timeout on first request (slow request) return 408 trace!("Slow request timeout"); self.flags.insert(Flags::STARTED | Flags::DISCONNECTED); - let _ = self - .send_response(Response::RequestTimeout().finish(), ()); + let _ = self.send_response( + Response::RequestTimeout().finish().drop_body(), + (), + ); self.state = State::None; } } else if let Some(deadline) = self.config.keep_alive_expire() { diff --git a/src/h1/encoder.rs b/src/h1/encoder.rs index aee17c1f..fd52d731 100644 --- a/src/h1/encoder.rs +++ b/src/h1/encoder.rs @@ -8,7 +8,7 @@ use bytes::{Bytes, BytesMut}; use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH}; use http::{StatusCode, Version}; -use body::{Binary, BodyLength}; +use body::BodyLength; use header::ContentEncoding; use http::Method; use message::{RequestHead, ResponseHead}; @@ -52,7 +52,7 @@ impl ResponseEncoder { ) { self.head = head; let transfer = match length { - BodyLength::Zero => { + BodyLength::Empty => { match resp.status { StatusCode::NO_CONTENT | StatusCode::CONTINUE diff --git a/src/lib.rs b/src/lib.rs index 4b43cae4..615f0401 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,7 +109,7 @@ extern crate serde_derive; #[cfg(feature = "ssl")] extern crate openssl; -mod body; +pub mod body; pub mod client; mod config; mod extensions; @@ -129,7 +129,7 @@ pub mod h1; pub(crate) mod helpers; pub mod test; pub mod ws; -pub use body::{Binary, MessageBody}; +pub use body::{Body, MessageBody}; pub use error::{Error, ResponseError, Result}; pub use extensions::Extensions; pub use httpmessage::HttpMessage; @@ -150,7 +150,6 @@ pub mod dev { //! use actix_http::dev::*; //! ``` - pub use body::BodyStream; pub use httpmessage::{MessageBody, Readlines, UrlEncoded}; pub use json::JsonBody; pub use payload::{Payload, PayloadBuffer}; diff --git a/src/response.rs b/src/response.rs index b3e59982..a6f7c81c 100644 --- a/src/response.rs +++ b/src/response.rs @@ -12,9 +12,9 @@ use http::{Error as HttpError, HeaderMap, HttpTryFrom, StatusCode, Version}; use serde::Serialize; use serde_json; -use body::{MessageBody, MessageBodyStream}; +use body::{Body, BodyStream, MessageBody}; use error::Error; -use header::{ContentEncoding, Header, IntoHeaderValue}; +use header::{Header, IntoHeaderValue}; use message::{Head, MessageFlags, ResponseHead}; /// max write buffer size 64k @@ -32,9 +32,9 @@ pub enum ConnectionType { } /// An HTTP Response -pub struct Response(Box, B); +pub struct Response(Box, B); -impl Response<()> { +impl Response { /// Create http response builder with specific status. #[inline] pub fn build(status: StatusCode) -> ResponseBuilder { @@ -50,7 +50,7 @@ impl Response<()> { /// Constructs a response #[inline] pub fn new(status: StatusCode) -> Response { - ResponsePool::with_body(status, ()) + ResponsePool::with_body(status, Body::Empty) } /// Constructs an error response @@ -242,6 +242,11 @@ impl Response { Response(self.0, body) } + /// Drop request's body + pub fn drop_body(self) -> Response<()> { + Response(self.0, ()) + } + /// Set a body and return previous body value pub fn replace_body(self, body: B2) -> (Response, B) { (Response(self.0, body), self.1) @@ -252,7 +257,7 @@ impl Response { self.get_ref().response_size } - /// Set content encoding + /// Set response size pub(crate) fn set_response_size(&mut self, size: u64) { self.get_mut().response_size = size; } @@ -266,7 +271,7 @@ impl Response { } pub(crate) fn from_parts(parts: ResponseParts) -> Response { - Response(Box::new(InnerResponse::from_parts(parts)), ()) + Response(Box::new(InnerResponse::from_parts(parts)), Body::Empty) } } @@ -279,7 +284,6 @@ impl fmt::Debug for Response { self.get_ref().head.status, self.get_ref().head.reason.unwrap_or("") ); - let _ = writeln!(f, " encoding: {:?}", self.get_ref().encoding); let _ = writeln!(f, " headers:"); for (key, val) in self.get_ref().head.headers.iter() { let _ = writeln!(f, " {:?}: {:?}", key, val); @@ -396,20 +400,6 @@ impl ResponseBuilder { self } - /// Set content encoding. - /// - /// By default `ContentEncoding::Auto` is used, which automatically - /// negotiates content encoding based on request's `Accept-Encoding` - /// headers. To enforce specific encoding, use specific - /// ContentEncoding` value. - #[inline] - pub fn content_encoding(&mut self, enc: ContentEncoding) -> &mut Self { - if let Some(parts) = parts(&mut self.response, &self.err) { - parts.encoding = Some(enc); - } - self - } - /// Set connection type #[inline] #[doc(hidden)] @@ -558,7 +548,14 @@ impl ResponseBuilder { /// Set a body and generate `Response`. /// /// `ResponseBuilder` can not be used after this call. - pub fn body(&mut self, body: B) -> Response { + pub fn body>(&mut self, body: B) -> Response { + self.message_body(body.into()) + } + + /// Set a body and generate `Response`. + /// + /// `ResponseBuilder` can not be used after this call. + pub fn message_body(&mut self, body: B) -> Response { let mut error = if let Some(e) = self.err.take() { Some(Error::from(e)) } else { @@ -589,25 +586,25 @@ impl ResponseBuilder { /// Set a streaming body and generate `Response`. /// /// `ResponseBuilder` can not be used after this call. - pub fn streaming(&mut self, stream: S) -> Response + pub fn streaming(&mut self, stream: S) -> Response where S: Stream + 'static, - E: Into, + E: Into + 'static, { - self.body(MessageBodyStream::new(stream.map_err(|e| e.into()))) + self.body(Body::from_message(BodyStream::new(stream))) } /// Set a json body and generate `Response` /// /// `ResponseBuilder` can not be used after this call. - pub fn json(&mut self, value: T) -> Response { + pub fn json(&mut self, value: T) -> Response { self.json2(&value) } /// Set a json body and generate `Response` /// /// `ResponseBuilder` can not be used after this call. - pub fn json2(&mut self, value: &T) -> Response { + pub fn json2(&mut self, value: &T) -> Response { match serde_json::to_string(value) { Ok(body) => { let contains = if let Some(parts) = parts(&mut self.response, &self.err) @@ -620,12 +617,9 @@ impl ResponseBuilder { self.header(header::CONTENT_TYPE, "application/json"); } - self.body(body) - } - Err(e) => { - let mut res: Response = Error::from(e).into(); - res.replace_body(String::new()).0 + self.body(Body::from(body)) } + Err(e) => Error::from(e).into(), } } @@ -633,8 +627,8 @@ impl ResponseBuilder { /// Set an empty body and generate `Response` /// /// `ResponseBuilder` can not be used after this call. - pub fn finish(&mut self) -> Response<()> { - self.body(()) + pub fn finish(&mut self) -> Response { + self.body(Body::Empty) } /// This method construct new `ResponseBuilder` @@ -675,7 +669,7 @@ impl From for Response { } } -impl From<&'static str> for Response<&'static str> { +impl From<&'static str> for Response { fn from(val: &'static str) -> Self { Response::Ok() .content_type("text/plain; charset=utf-8") @@ -683,7 +677,7 @@ impl From<&'static str> for Response<&'static str> { } } -impl From<&'static [u8]> for Response<&'static [u8]> { +impl From<&'static [u8]> for Response { fn from(val: &'static [u8]) -> Self { Response::Ok() .content_type("application/octet-stream") @@ -691,7 +685,7 @@ impl From<&'static [u8]> for Response<&'static [u8]> { } } -impl From for Response { +impl From for Response { fn from(val: String) -> Self { Response::Ok() .content_type("text/plain; charset=utf-8") @@ -699,7 +693,15 @@ impl From for Response { } } -impl From for Response { +impl<'a> From<&'a String> for Response { + fn from(val: &'a String) -> Self { + Response::Ok() + .content_type("text/plain; charset=utf-8") + .body(val) + } +} + +impl From for Response { fn from(val: Bytes) -> Self { Response::Ok() .content_type("application/octet-stream") @@ -707,7 +709,7 @@ impl From for Response { } } -impl From for Response { +impl From for Response { fn from(val: BytesMut) -> Self { Response::Ok() .content_type("application/octet-stream") @@ -717,7 +719,6 @@ impl From for Response { struct InnerResponse { head: ResponseHead, - encoding: Option, connection_type: Option, write_capacity: usize, response_size: u64, @@ -727,7 +728,6 @@ struct InnerResponse { pub(crate) struct ResponseParts { head: ResponseHead, - encoding: Option, connection_type: Option, error: Option, } @@ -744,7 +744,6 @@ impl InnerResponse { flags: MessageFlags::empty(), }, pool, - encoding: None, connection_type: None, response_size: 0, write_capacity: MAX_WRITE_BUFFER_SIZE, @@ -756,7 +755,6 @@ impl InnerResponse { fn into_parts(self) -> ResponseParts { ResponseParts { head: self.head, - encoding: self.encoding, connection_type: self.connection_type, error: self.error, } @@ -765,7 +763,6 @@ impl InnerResponse { fn from_parts(parts: ResponseParts) -> InnerResponse { InnerResponse { head: parts.head, - encoding: parts.encoding, connection_type: parts.connection_type, response_size: 0, write_capacity: MAX_WRITE_BUFFER_SIZE, @@ -841,7 +838,6 @@ impl ResponsePool { let mut p = inner.pool.0.borrow_mut(); if p.len() < 128 { inner.head.clear(); - inner.encoding = None; inner.connection_type = None; inner.response_size = 0; inner.error = None; @@ -854,11 +850,10 @@ impl ResponsePool { #[cfg(test)] mod tests { use super::*; - use body::Binary; + use body::Body; use http; use http::header::{HeaderValue, CONTENT_TYPE, COOKIE}; - use header::ContentEncoding; // use test::TestRequest; #[test] @@ -953,24 +948,24 @@ mod tests { assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain") } - #[test] - fn test_content_encoding() { - let resp = Response::build(StatusCode::OK).finish(); - assert_eq!(resp.content_encoding(), None); + // #[test] + // fn test_content_encoding() { + // let resp = Response::build(StatusCode::OK).finish(); + // assert_eq!(resp.content_encoding(), None); - #[cfg(feature = "brotli")] - { - let resp = Response::build(StatusCode::OK) - .content_encoding(ContentEncoding::Br) - .finish(); - assert_eq!(resp.content_encoding(), Some(ContentEncoding::Br)); - } + // #[cfg(feature = "brotli")] + // { + // let resp = Response::build(StatusCode::OK) + // .content_encoding(ContentEncoding::Br) + // .finish(); + // assert_eq!(resp.content_encoding(), Some(ContentEncoding::Br)); + // } - let resp = Response::build(StatusCode::OK) - .content_encoding(ContentEncoding::Gzip) - .finish(); - assert_eq!(resp.content_encoding(), Some(ContentEncoding::Gzip)); - } + // let resp = Response::build(StatusCode::OK) + // .content_encoding(ContentEncoding::Gzip) + // .finish(); + // assert_eq!(resp.content_encoding(), Some(ContentEncoding::Gzip)); + // } #[test] fn test_json() { @@ -1020,15 +1015,6 @@ mod tests { ); } - impl Body { - pub(crate) fn bin_ref(&self) -> &Binary { - match *self { - Body::Binary(ref bin) => bin, - _ => panic!(), - } - } - } - #[test] fn test_into_response() { let resp: Response = "test".into(); @@ -1038,7 +1024,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from("test")); + assert_eq!(resp.body().get_ref(), b"test"); let resp: Response = b"test".as_ref().into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1047,7 +1033,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref())); + assert_eq!(resp.body().get_ref(), b"test"); let resp: Response = "test".to_owned().into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1056,7 +1042,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned())); + assert_eq!(resp.body().get_ref(), b"test"); let resp: Response = (&"test".to_owned()).into(); assert_eq!(resp.status(), StatusCode::OK); @@ -1065,7 +1051,7 @@ mod tests { HeaderValue::from_static("text/plain; charset=utf-8") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned())); + assert_eq!(resp.body().get_ref(), b"test"); let b = Bytes::from_static(b"test"); let resp: Response = b.into(); @@ -1075,10 +1061,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!( - resp.body().bin_ref(), - &Binary::from(Bytes::from_static(b"test")) - ); + assert_eq!(resp.body().get_ref(), b"test"); let b = Bytes::from_static(b"test"); let resp: Response = b.into(); @@ -1088,7 +1071,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); + assert_eq!(resp.body().get_ref(), b"test"); let b = BytesMut::from("test"); let resp: Response = b.into(); @@ -1098,7 +1081,7 @@ mod tests { HeaderValue::from_static("application/octet-stream") ); assert_eq!(resp.status(), StatusCode::OK); - assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); + assert_eq!(resp.body().get_ref(), b"test"); } #[test] diff --git a/src/service.rs b/src/service.rs index ac92a0f7..51a16b48 100644 --- a/src/service.rs +++ b/src/service.rs @@ -72,7 +72,7 @@ where } pub struct SendErrorFut { - res: Option>, + res: Option>>, framed: Option>, err: Option, _t: PhantomData, @@ -188,7 +188,7 @@ where } pub struct SendResponseFut { - res: Option>, + res: Option>>, body: Option, framed: Option>, } diff --git a/src/test.rs b/src/test.rs index 43974934..c6f258a7 100644 --- a/src/test.rs +++ b/src/test.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use actix::System; +use bytes::Bytes; use cookie::Cookie; use futures::Future; use http::header::HeaderName; @@ -11,7 +12,6 @@ use http::{HeaderMap, HttpTryFrom, Method, Uri, Version}; use net2::TcpBuilder; use tokio::runtime::current_thread::Runtime; -use body::Binary; use header::{Header, IntoHeaderValue}; use payload::Payload; use request::Request; @@ -362,10 +362,9 @@ impl TestRequest { } /// Set request payload - pub fn set_payload>(mut self, data: B) -> Self { - let mut data = data.into(); + pub fn set_payload>(mut self, data: B) -> Self { let mut payload = Payload::empty(); - payload.unread_data(data.take()); + payload.unread_data(data.into()); self.payload = Some(payload); self } diff --git a/src/ws/codec.rs b/src/ws/codec.rs index 5bdc5819..10c50528 100644 --- a/src/ws/codec.rs +++ b/src/ws/codec.rs @@ -1,10 +1,9 @@ -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use tokio_codec::{Decoder, Encoder}; use super::frame::Parser; use super::proto::{CloseReason, OpCode}; use super::ProtocolError; -use body::Binary; /// `WebSocket` Message #[derive(Debug, PartialEq)] @@ -12,7 +11,7 @@ pub enum Message { /// Text message Text(String), /// Binary message - Binary(Binary), + Binary(Bytes), /// Ping message Ping(String), /// Pong message diff --git a/src/ws/frame.rs b/src/ws/frame.rs index ca5f0e89..56d28296 100644 --- a/src/ws/frame.rs +++ b/src/ws/frame.rs @@ -1,8 +1,7 @@ use byteorder::{ByteOrder, LittleEndian, NetworkEndian}; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use rand; -use body::Binary; use ws::mask::apply_mask; use ws::proto::{CloseCode, CloseReason, OpCode}; use ws::ProtocolError; @@ -151,7 +150,7 @@ impl Parser { } /// Generate binary representation - pub fn write_message>( + pub fn write_message>( dst: &mut BytesMut, pl: B, op: OpCode, diff --git a/tests/test_server.rs b/tests/test_server.rs index c8de0290..c499cf63 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -15,7 +15,7 @@ use bytes::Bytes; use futures::future::{self, ok}; use futures::stream::once; -use actix_http::{h1, http, Body, KeepAlive, Request, Response}; +use actix_http::{body, h1, http, Body, Error, KeepAlive, Request, Response}; #[test] fn test_h1_v2() { @@ -349,11 +349,9 @@ fn test_body_length() { .bind("test", addr, move || { h1::H1Service::new(|_| { let body = once(Ok(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok() - .content_length(STR.len() as u64) - .body(Body::Streaming(Box::new(body))), - ) + ok::<_, ()>(Response::Ok().body(Body::from_message( + body::SizedStream::new(STR.len(), body), + ))) }).map(|_| ()) }).unwrap() .run() @@ -379,12 +377,8 @@ fn test_body_chunked_explicit() { Server::new() .bind("test", addr, move || { h1::H1Service::new(|_| { - let body = once(Ok(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>( - Response::Ok() - .chunked() - .body(Body::Streaming(Box::new(body))), - ) + let body = once::<_, Error>(Ok(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>(Response::Ok().streaming(body)) }).map(|_| ()) }).unwrap() .run() @@ -412,8 +406,8 @@ fn test_body_chunked_implicit() { Server::new() .bind("test", addr, move || { h1::H1Service::new(|_| { - let body = once(Ok(Bytes::from_static(STR.as_ref()))); - ok::<_, ()>(Response::Ok().body(Body::Streaming(Box::new(body)))) + let body = once::<_, Error>(Ok(Bytes::from_static(STR.as_ref()))); + ok::<_, ()>(Response::Ok().streaming(body)) }).map(|_| ()) }).unwrap() .run() diff --git a/tests/test_ws.rs b/tests/test_ws.rs index 85770fa8..c246d5e4 100644 --- a/tests/test_ws.rs +++ b/tests/test_ws.rs @@ -18,7 +18,7 @@ use bytes::{Bytes, BytesMut}; use futures::future::{lazy, ok, Either}; use futures::{Future, IntoFuture, Sink, Stream}; -use actix_http::{h1, ws, ResponseError, ServiceConfig}; +use actix_http::{h1, ws, ResponseError, SendResponse, ServiceConfig}; fn ws_service(req: ws::Frame) -> impl Future { match req { @@ -55,20 +55,19 @@ fn test_simple() { match ws::verify_handshake(&req) { Err(e) => { // validation failed - let resp = e.error_response(); Either::A( - framed - .send(h1::Message::Item(resp)) + SendResponse::send(framed, e.error_response()) .map_err(|_| ()) .map(|_| ()), ) } - Ok(_) => Either::B( - // send response - framed - .send(h1::Message::Item( + Ok(_) => { + Either::B( + // send handshake response + SendResponse::send( + framed, ws::handshake_response(&req).finish(), - )).map_err(|_| ()) + ).map_err(|_| ()) .and_then(|framed| { // start websocket service let framed = @@ -76,7 +75,8 @@ fn test_simple() { ws::Transport::with(framed, ws_service) .map_err(|_| ()) }), - ), + ) + } } } else { panic!()