1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 23:34:35 +01:00

re-introduce Body type, use Body as default body type for Response

This commit is contained in:
Nikolay Kim 2018-11-18 13:48:42 -08:00
parent 7fed50bcae
commit 8fea1367c7
16 changed files with 309 additions and 335 deletions

View File

@ -1,22 +1,19 @@
use std::mem; use std::marker::PhantomData;
use std::sync::Arc; use std::{fmt, mem};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use error::{Error, PayloadError}; use error::{Error, PayloadError};
/// Type represent streaming body
pub type BodyStream = Box<dyn Stream<Item = Bytes, Error = Error>>;
/// Type represent streaming payload /// Type represent streaming payload
pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>; pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>;
#[derive(Debug)] #[derive(Debug, PartialEq)]
/// Different type of body /// Different type of body
pub enum BodyLength { pub enum BodyLength {
None, None,
Zero, Empty,
Sized(usize), Sized(usize),
Sized64(u64), Sized64(u64),
Chunked, Chunked,
@ -32,7 +29,7 @@ pub trait MessageBody {
impl MessageBody for () { impl MessageBody for () {
fn length(&self) -> BodyLength { fn length(&self) -> BodyLength {
BodyLength::Zero BodyLength::Empty
} }
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> { fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -40,150 +37,129 @@ impl MessageBody for () {
} }
} }
/// Represents various types of binary body. /// Represents various types of http message body.
/// `Content-Length` header is set to length of the body. pub enum Body {
#[derive(Debug, PartialEq)] /// Empty response. `Content-Length` header is not set.
pub enum Binary { None,
/// Bytes body /// Zero sized response body. `Content-Length` header is set to `0`.
Empty,
/// Specific response body.
Bytes(Bytes), Bytes(Bytes),
/// Static slice /// Generic message body.
Slice(&'static [u8]), Message(Box<dyn MessageBody>),
/// Shared string body
#[doc(hidden)]
SharedString(Arc<String>),
/// Shared vec body
SharedVec(Arc<Vec<u8>>),
} }
impl Binary { impl Body {
#[inline] /// Create body from slice (copy)
/// Returns `true` if body is empty pub fn from_slice(s: &[u8]) -> Body {
pub fn is_empty(&self) -> bool { Body::Bytes(Bytes::from(s))
self.len() == 0
} }
#[inline] /// Create body from generic message body.
/// Length of body in bytes pub fn from_message<B: MessageBody + 'static>(body: B) -> Body {
pub fn len(&self) -> usize { Body::Message(Box::new(body))
match *self {
Binary::Bytes(ref bytes) => bytes.len(),
Binary::Slice(slice) => slice.len(),
Binary::SharedString(ref s) => s.len(),
Binary::SharedVec(ref s) => s.len(),
}
}
/// Create binary body from slice
pub fn from_slice(s: &[u8]) -> Binary {
Binary::Bytes(Bytes::from(s))
}
/// Convert Binary to a Bytes instance
pub fn take(&mut self) -> Bytes {
mem::replace(self, Binary::Slice(b"")).into()
} }
} }
impl Clone for Binary { impl MessageBody for Body {
fn clone(&self) -> Binary { fn length(&self) -> BodyLength {
match *self {
Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()),
Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)),
Binary::SharedString(ref s) => Binary::SharedString(s.clone()),
Binary::SharedVec(ref s) => Binary::SharedVec(s.clone()),
}
}
}
impl Into<Bytes> for Binary {
fn into(self) -> Bytes {
match self { match self {
Binary::Bytes(bytes) => bytes, Body::None => BodyLength::None,
Binary::Slice(slice) => Bytes::from(slice), Body::Empty => BodyLength::Empty,
Binary::SharedString(s) => Bytes::from(s.as_str()), Body::Bytes(ref bin) => BodyLength::Sized(bin.len()),
Binary::SharedVec(s) => Bytes::from(AsRef::<[u8]>::as_ref(s.as_ref())), Body::Message(ref body) => body.length(),
}
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
match self {
Body::None => Ok(Async::Ready(None)),
Body::Empty => Ok(Async::Ready(None)),
Body::Bytes(ref mut bin) => {
if bin.len() == 0 {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(bin.slice_to(bin.len()))))
}
}
Body::Message(ref mut body) => body.poll_next(),
} }
} }
} }
impl From<&'static str> for Binary { impl PartialEq for Body {
fn from(s: &'static str) -> Binary { fn eq(&self, other: &Body) -> bool {
Binary::Slice(s.as_ref())
}
}
impl From<&'static [u8]> for Binary {
fn from(s: &'static [u8]) -> Binary {
Binary::Slice(s)
}
}
impl From<Vec<u8>> for Binary {
fn from(vec: Vec<u8>) -> Binary {
Binary::Bytes(Bytes::from(vec))
}
}
impl From<String> for Binary {
fn from(s: String) -> Binary {
Binary::Bytes(Bytes::from(s))
}
}
impl<'a> From<&'a String> for Binary {
fn from(s: &'a String) -> Binary {
Binary::Bytes(Bytes::from(AsRef::<[u8]>::as_ref(&s)))
}
}
impl From<Bytes> for Binary {
fn from(s: Bytes) -> Binary {
Binary::Bytes(s)
}
}
impl From<BytesMut> for Binary {
fn from(s: BytesMut) -> Binary {
Binary::Bytes(s.freeze())
}
}
impl From<Arc<String>> for Binary {
fn from(body: Arc<String>) -> Binary {
Binary::SharedString(body)
}
}
impl<'a> From<&'a Arc<String>> for Binary {
fn from(body: &'a Arc<String>) -> Binary {
Binary::SharedString(Arc::clone(body))
}
}
impl From<Arc<Vec<u8>>> for Binary {
fn from(body: Arc<Vec<u8>>) -> Binary {
Binary::SharedVec(body)
}
}
impl<'a> From<&'a Arc<Vec<u8>>> for Binary {
fn from(body: &'a Arc<Vec<u8>>) -> Binary {
Binary::SharedVec(Arc::clone(body))
}
}
impl AsRef<[u8]> for Binary {
#[inline]
fn as_ref(&self) -> &[u8] {
match *self { match *self {
Binary::Bytes(ref bytes) => bytes.as_ref(), Body::None => match *other {
Binary::Slice(slice) => slice, Body::None => true,
Binary::SharedString(ref s) => s.as_bytes(), _ => false,
Binary::SharedVec(ref s) => s.as_ref().as_ref(), },
Body::Empty => match *other {
Body::Empty => true,
_ => false,
},
Body::Bytes(ref b) => match *other {
Body::Bytes(ref b2) => b == b2,
_ => false,
},
Body::Message(_) => false,
} }
} }
} }
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Body::None => write!(f, "Body::None"),
Body::Empty => write!(f, "Body::Zero"),
Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b),
Body::Message(_) => write!(f, "Body::Message(_)"),
}
}
}
impl From<&'static str> for Body {
fn from(s: &'static str) -> Body {
Body::Bytes(Bytes::from_static(s.as_ref()))
}
}
impl From<&'static [u8]> for Body {
fn from(s: &'static [u8]) -> Body {
Body::Bytes(Bytes::from_static(s.as_ref()))
}
}
impl From<Vec<u8>> for Body {
fn from(vec: Vec<u8>) -> Body {
Body::Bytes(Bytes::from(vec))
}
}
impl From<String> for Body {
fn from(s: String) -> Body {
s.into_bytes().into()
}
}
impl<'a> From<&'a String> for Body {
fn from(s: &'a String) -> Body {
Body::Bytes(Bytes::from(AsRef::<[u8]>::as_ref(&s)))
}
}
impl From<Bytes> for Body {
fn from(s: Bytes) -> Body {
Body::Bytes(s)
}
}
impl From<BytesMut> for Body {
fn from(s: BytesMut) -> Body {
Body::Bytes(s.freeze())
}
}
impl MessageBody for Bytes { impl MessageBody for Bytes {
fn length(&self) -> BodyLength { fn length(&self) -> BodyLength {
BodyLength::Sized(self.len()) BodyLength::Sized(self.len())
@ -279,26 +255,62 @@ impl MessageBody for String {
} }
} }
#[doc(hidden)] /// Type represent streaming body.
pub struct MessageBodyStream<S> { /// Response does not contain `content-length` header and appropriate transfer encoding is used.
pub struct BodyStream<S, E> {
stream: S, stream: S,
_t: PhantomData<E>,
} }
impl<S> MessageBodyStream<S> impl<S, E> BodyStream<S, E>
where where
S: Stream<Item = Bytes, Error = Error>, S: Stream<Item = Bytes, Error = E>,
E: Into<Error>,
{ {
pub fn new(stream: S) -> Self { pub fn new(stream: S) -> Self {
MessageBodyStream { stream } BodyStream {
stream,
_t: PhantomData,
}
} }
} }
impl<S> MessageBody for MessageBodyStream<S> impl<S, E> MessageBody for BodyStream<S, E>
where
S: Stream<Item = Bytes, Error = E>,
E: Into<Error>,
{
fn length(&self) -> BodyLength {
BodyLength::Chunked
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
self.stream.poll().map_err(|e| e.into())
}
}
/// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding.
pub struct SizedStream<S> {
size: usize,
stream: S,
}
impl<S> SizedStream<S>
where
S: Stream<Item = Bytes, Error = Error>,
{
pub fn new(size: usize, stream: S) -> Self {
SizedStream { size, stream }
}
}
impl<S> MessageBody for SizedStream<S>
where where
S: Stream<Item = Bytes, Error = Error>, S: Stream<Item = Bytes, Error = Error>,
{ {
fn length(&self) -> BodyLength { fn length(&self) -> BodyLength {
BodyLength::Chunked BodyLength::Sized(self.size)
} }
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> { fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -310,78 +322,61 @@ where
mod tests { mod tests {
use super::*; use super::*;
#[test] impl Body {
fn test_is_empty() { pub(crate) fn get_ref(&self) -> &[u8] {
assert_eq!(Binary::from("").is_empty(), true); match *self {
assert_eq!(Binary::from("test").is_empty(), false); Body::Bytes(ref bin) => &bin,
_ => panic!(),
}
}
} }
#[test] #[test]
fn test_static_str() { fn test_static_str() {
assert_eq!(Binary::from("test").len(), 4); assert_eq!(Body::from("").length(), BodyLength::Sized(0));
assert_eq!(Binary::from("test").as_ref(), b"test"); assert_eq!(Body::from("test").length(), BodyLength::Sized(4));
assert_eq!(Body::from("test").get_ref(), b"test");
} }
#[test] #[test]
fn test_static_bytes() { fn test_static_bytes() {
assert_eq!(Binary::from(b"test".as_ref()).len(), 4); assert_eq!(Body::from(b"test".as_ref()).length(), BodyLength::Sized(4));
assert_eq!(Binary::from(b"test".as_ref()).as_ref(), b"test"); assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
assert_eq!(Binary::from_slice(b"test".as_ref()).len(), 4); assert_eq!(
assert_eq!(Binary::from_slice(b"test".as_ref()).as_ref(), b"test"); Body::from_slice(b"test".as_ref()).length(),
BodyLength::Sized(4)
);
assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
} }
#[test] #[test]
fn test_vec() { fn test_vec() {
assert_eq!(Binary::from(Vec::from("test")).len(), 4); assert_eq!(Body::from(Vec::from("test")).length(), BodyLength::Sized(4));
assert_eq!(Binary::from(Vec::from("test")).as_ref(), b"test"); assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
} }
#[test] #[test]
fn test_bytes() { fn test_bytes() {
assert_eq!(Binary::from(Bytes::from("test")).len(), 4); assert_eq!(
assert_eq!(Binary::from(Bytes::from("test")).as_ref(), b"test"); Body::from(Bytes::from("test")).length(),
} BodyLength::Sized(4)
);
#[test] assert_eq!(Body::from(Bytes::from("test")).get_ref(), b"test");
fn test_arc_string() {
let b = Arc::new("test".to_owned());
assert_eq!(Binary::from(b.clone()).len(), 4);
assert_eq!(Binary::from(b.clone()).as_ref(), b"test");
assert_eq!(Binary::from(&b).len(), 4);
assert_eq!(Binary::from(&b).as_ref(), b"test");
} }
#[test] #[test]
fn test_string() { fn test_string() {
let b = "test".to_owned(); let b = "test".to_owned();
assert_eq!(Binary::from(b.clone()).len(), 4); assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4));
assert_eq!(Binary::from(b.clone()).as_ref(), b"test"); assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(Binary::from(&b).len(), 4); assert_eq!(Body::from(&b).length(), BodyLength::Sized(4));
assert_eq!(Binary::from(&b).as_ref(), b"test"); assert_eq!(Body::from(&b).get_ref(), b"test");
}
#[test]
fn test_shared_vec() {
let b = Arc::new(Vec::from(&b"test"[..]));
assert_eq!(Binary::from(b.clone()).len(), 4);
assert_eq!(Binary::from(b.clone()).as_ref(), &b"test"[..]);
assert_eq!(Binary::from(&b).len(), 4);
assert_eq!(Binary::from(&b).as_ref(), &b"test"[..]);
} }
#[test] #[test]
fn test_bytes_mut() { fn test_bytes_mut() {
let b = BytesMut::from("test"); let b = BytesMut::from("test");
assert_eq!(Binary::from(b.clone()).len(), 4); assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4));
assert_eq!(Binary::from(b).as_ref(), b"test"); assert_eq!(Body::from(b).get_ref(), b"test");
}
#[test]
fn test_binary_into() {
let bytes = Bytes::from_static(b"test");
let b: Bytes = Binary::from("test").into();
assert_eq!(b, bytes);
let b: Bytes = Binary::from(bytes.clone()).into();
assert_eq!(b, bytes);
} }
} }

View File

@ -36,7 +36,9 @@ where
.and_then(|framed| framed.send((head, len).into()).from_err()) .and_then(|framed| framed.send((head, len).into()).from_err())
// send request body // send request body
.and_then(move |framed| match body.length() { .and_then(move |framed| match body.length() {
BodyLength::None | BodyLength::Zero => Either::A(ok(framed)), BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => {
Either::A(ok(framed))
}
_ => Either::B(SendBody::new(body, framed)), _ => Either::B(SendBody::new(body, framed)),
}) })
// read response and init read body // read response and init read body

View File

@ -9,7 +9,7 @@ use futures::{Future, Stream};
use percent_encoding::{percent_encode, USERINFO_ENCODE_SET}; use percent_encoding::{percent_encode, USERINFO_ENCODE_SET};
use urlcrate::Url; use urlcrate::Url;
use body::{MessageBody, MessageBodyStream}; use body::{BodyStream, MessageBody};
use error::Error; use error::Error;
use header::{self, Header, IntoHeaderValue}; use header::{self, Header, IntoHeaderValue};
use http::{ use http::{
@ -534,14 +534,15 @@ impl ClientRequestBuilder {
/// Set an streaming body and generate `ClientRequest`. /// Set an streaming body and generate `ClientRequest`.
/// ///
/// `ClientRequestBuilder` can not be used after this call. /// `ClientRequestBuilder` can not be used after this call.
pub fn stream<S>( pub fn stream<S, E>(
&mut self, &mut self,
stream: S, stream: S,
) -> Result<ClientRequest<impl MessageBody>, HttpError> ) -> Result<ClientRequest<impl MessageBody>, HttpError>
where where
S: Stream<Item = Bytes, Error = Error>, S: Stream<Item = Bytes, Error = E>,
E: Into<Error> + 'static,
{ {
self.body(MessageBodyStream::new(stream)) self.body(BodyStream::new(stream))
} }
/// Set an empty body and generate `ClientRequest`. /// Set an empty body and generate `ClientRequest`.

View File

@ -20,6 +20,7 @@ use tokio_timer::Error as TimerError;
// re-exports // re-exports
pub use cookie::ParseError as CookieParseError; pub use cookie::ParseError as CookieParseError;
use body::Body;
use response::{Response, ResponseParts}; use response::{Response, ResponseParts};
/// A specialized [`Result`](https://doc.rust-lang.org/std/result/enum.Result.html) /// A specialized [`Result`](https://doc.rust-lang.org/std/result/enum.Result.html)
@ -100,10 +101,10 @@ impl Error {
} }
/// Converts error to a response instance and set error message as response body /// Converts error to a response instance and set error message as response body
pub fn response_with_message(self) -> Response<String> { pub fn response_with_message(self) -> Response {
let message = format!("{}", self); let message = format!("{}", self);
let resp: Response = self.into(); let resp: Response = self.into();
resp.set_body(message) resp.set_body(Body::from(message))
} }
} }

View File

@ -7,7 +7,7 @@ use tokio_codec::{Decoder, Encoder};
use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType};
use super::encoder::RequestEncoder; use super::encoder::RequestEncoder;
use super::{Message, MessageType}; use super::{Message, MessageType};
use body::{Binary, BodyLength}; use body::BodyLength;
use client::ClientResponse; use client::ClientResponse;
use config::ServiceConfig; use config::ServiceConfig;
use error::{ParseError, PayloadError}; use error::{ParseError, PayloadError};
@ -167,7 +167,7 @@ impl ClientCodecInner {
BodyLength::Chunked => { BodyLength::Chunked => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
} }
BodyLength::Zero => { BodyLength::Empty => {
len_is_set = false; len_is_set = false;
buffer.extend_from_slice(b"\r\n") buffer.extend_from_slice(b"\r\n")
} }
@ -183,7 +183,7 @@ impl ClientCodecInner {
TRANSFER_ENCODING => continue, TRANSFER_ENCODING => continue,
CONTENT_LENGTH => match length { CONTENT_LENGTH => match length {
BodyLength::None => (), BodyLength::None => (),
BodyLength::Zero => len_is_set = true, BodyLength::Empty => len_is_set = true,
_ => continue, _ => continue,
}, },
DATE => has_date = true, DATE => has_date = true,

View File

@ -8,7 +8,7 @@ use tokio_codec::{Decoder, Encoder};
use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType};
use super::encoder::ResponseEncoder; use super::encoder::ResponseEncoder;
use super::{Message, MessageType}; use super::{Message, MessageType};
use body::{Binary, BodyLength}; use body::BodyLength;
use config::ServiceConfig; use config::ServiceConfig;
use error::ParseError; use error::ParseError;
use helpers; use helpers;
@ -106,7 +106,7 @@ impl Codec {
fn encode_response( fn encode_response(
&mut self, &mut self,
mut msg: Response, mut msg: Response<()>,
buffer: &mut BytesMut, buffer: &mut BytesMut,
) -> io::Result<()> { ) -> io::Result<()> {
let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg let ka = self.flags.contains(Flags::KEEPALIVE_ENABLED) && msg
@ -149,7 +149,7 @@ impl Codec {
BodyLength::Chunked => { BodyLength::Chunked => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
} }
BodyLength::Zero => { BodyLength::Empty => {
len_is_set = false; len_is_set = false;
buffer.extend_from_slice(b"\r\n") buffer.extend_from_slice(b"\r\n")
} }
@ -174,7 +174,7 @@ impl Codec {
TRANSFER_ENCODING => continue, TRANSFER_ENCODING => continue,
CONTENT_LENGTH => match self.te.length { CONTENT_LENGTH => match self.te.length {
BodyLength::None => (), BodyLength::None => (),
BodyLength::Zero => { BodyLength::Empty => {
len_is_set = true; len_is_set = true;
} }
_ => continue, _ => continue,
@ -268,7 +268,7 @@ impl Decoder for Codec {
} }
impl Encoder for Codec { impl Encoder for Codec {
type Item = Message<Response>; type Item = Message<Response<()>>;
type Error = io::Error; type Error = io::Error;
fn encode( fn encode(

View File

@ -65,7 +65,7 @@ where
enum DispatcherMessage { enum DispatcherMessage {
Item(Request), Item(Request),
Error(Response), Error(Response<()>),
} }
enum State<S: Service, B: MessageBody> { enum State<S: Service, B: MessageBody> {
@ -190,7 +190,7 @@ where
fn send_response<B1: MessageBody>( fn send_response<B1: MessageBody>(
&mut self, &mut self,
message: Response, message: Response<()>,
body: B1, body: B1,
) -> Result<State<S, B1>, DispatchError<S::Error>> { ) -> Result<State<S, B1>, DispatchError<S::Error>> {
self.framed self.framed
@ -206,7 +206,7 @@ where
.set(Flags::KEEPALIVE, self.framed.get_codec().keepalive()); .set(Flags::KEEPALIVE, self.framed.get_codec().keepalive());
self.flags.remove(Flags::FLUSHED); self.flags.remove(Flags::FLUSHED);
match body.length() { match body.length() {
BodyLength::None | BodyLength::Zero => Ok(State::None), BodyLength::None | BodyLength::Empty => Ok(State::None),
_ => Ok(State::SendPayload(body)), _ => Ok(State::SendPayload(body)),
} }
} }
@ -351,7 +351,7 @@ where
); );
self.flags.insert(Flags::DISCONNECTED); self.flags.insert(Flags::DISCONNECTED);
self.messages.push_back(DispatcherMessage::Error( self.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish(), Response::InternalServerError().finish().drop_body(),
)); ));
self.error = Some(DispatchError::InternalError); self.error = Some(DispatchError::InternalError);
break; break;
@ -364,7 +364,7 @@ where
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
self.flags.insert(Flags::DISCONNECTED); self.flags.insert(Flags::DISCONNECTED);
self.messages.push_back(DispatcherMessage::Error( self.messages.push_back(DispatcherMessage::Error(
Response::InternalServerError().finish(), Response::InternalServerError().finish().drop_body(),
)); ));
self.error = Some(DispatchError::InternalError); self.error = Some(DispatchError::InternalError);
break; break;
@ -389,7 +389,7 @@ where
// Malformed requests should be responded with 400 // Malformed requests should be responded with 400
self.messages.push_back(DispatcherMessage::Error( self.messages.push_back(DispatcherMessage::Error(
Response::BadRequest().finish(), Response::BadRequest().finish().drop_body(),
)); ));
self.flags.insert(Flags::DISCONNECTED); self.flags.insert(Flags::DISCONNECTED);
self.error = Some(e.into()); self.error = Some(e.into());
@ -440,8 +440,10 @@ where
// timeout on first request (slow request) return 408 // timeout on first request (slow request) return 408
trace!("Slow request timeout"); trace!("Slow request timeout");
self.flags.insert(Flags::STARTED | Flags::DISCONNECTED); self.flags.insert(Flags::STARTED | Flags::DISCONNECTED);
let _ = self let _ = self.send_response(
.send_response(Response::RequestTimeout().finish(), ()); Response::RequestTimeout().finish().drop_body(),
(),
);
self.state = State::None; self.state = State::None;
} }
} else if let Some(deadline) = self.config.keep_alive_expire() { } else if let Some(deadline) = self.config.keep_alive_expire() {

View File

@ -8,7 +8,7 @@ use bytes::{Bytes, BytesMut};
use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH}; use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH};
use http::{StatusCode, Version}; use http::{StatusCode, Version};
use body::{Binary, BodyLength}; use body::BodyLength;
use header::ContentEncoding; use header::ContentEncoding;
use http::Method; use http::Method;
use message::{RequestHead, ResponseHead}; use message::{RequestHead, ResponseHead};
@ -52,7 +52,7 @@ impl ResponseEncoder {
) { ) {
self.head = head; self.head = head;
let transfer = match length { let transfer = match length {
BodyLength::Zero => { BodyLength::Empty => {
match resp.status { match resp.status {
StatusCode::NO_CONTENT StatusCode::NO_CONTENT
| StatusCode::CONTINUE | StatusCode::CONTINUE

View File

@ -109,7 +109,7 @@ extern crate serde_derive;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
extern crate openssl; extern crate openssl;
mod body; pub mod body;
pub mod client; pub mod client;
mod config; mod config;
mod extensions; mod extensions;
@ -129,7 +129,7 @@ pub mod h1;
pub(crate) mod helpers; pub(crate) mod helpers;
pub mod test; pub mod test;
pub mod ws; pub mod ws;
pub use body::{Binary, MessageBody}; pub use body::{Body, MessageBody};
pub use error::{Error, ResponseError, Result}; pub use error::{Error, ResponseError, Result};
pub use extensions::Extensions; pub use extensions::Extensions;
pub use httpmessage::HttpMessage; pub use httpmessage::HttpMessage;
@ -150,7 +150,6 @@ pub mod dev {
//! use actix_http::dev::*; //! use actix_http::dev::*;
//! ``` //! ```
pub use body::BodyStream;
pub use httpmessage::{MessageBody, Readlines, UrlEncoded}; pub use httpmessage::{MessageBody, Readlines, UrlEncoded};
pub use json::JsonBody; pub use json::JsonBody;
pub use payload::{Payload, PayloadBuffer}; pub use payload::{Payload, PayloadBuffer};

View File

@ -12,9 +12,9 @@ use http::{Error as HttpError, HeaderMap, HttpTryFrom, StatusCode, Version};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
use body::{MessageBody, MessageBodyStream}; use body::{Body, BodyStream, MessageBody};
use error::Error; use error::Error;
use header::{ContentEncoding, Header, IntoHeaderValue}; use header::{Header, IntoHeaderValue};
use message::{Head, MessageFlags, ResponseHead}; use message::{Head, MessageFlags, ResponseHead};
/// max write buffer size 64k /// max write buffer size 64k
@ -32,9 +32,9 @@ pub enum ConnectionType {
} }
/// An HTTP Response /// An HTTP Response
pub struct Response<B: MessageBody = ()>(Box<InnerResponse>, B); pub struct Response<B: MessageBody = Body>(Box<InnerResponse>, B);
impl Response<()> { impl Response<Body> {
/// Create http response builder with specific status. /// Create http response builder with specific status.
#[inline] #[inline]
pub fn build(status: StatusCode) -> ResponseBuilder { pub fn build(status: StatusCode) -> ResponseBuilder {
@ -50,7 +50,7 @@ impl Response<()> {
/// Constructs a response /// Constructs a response
#[inline] #[inline]
pub fn new(status: StatusCode) -> Response { pub fn new(status: StatusCode) -> Response {
ResponsePool::with_body(status, ()) ResponsePool::with_body(status, Body::Empty)
} }
/// Constructs an error response /// Constructs an error response
@ -242,6 +242,11 @@ impl<B: MessageBody> Response<B> {
Response(self.0, body) Response(self.0, body)
} }
/// Drop request's body
pub fn drop_body(self) -> Response<()> {
Response(self.0, ())
}
/// Set a body and return previous body value /// Set a body and return previous body value
pub fn replace_body<B2: MessageBody>(self, body: B2) -> (Response<B2>, B) { pub fn replace_body<B2: MessageBody>(self, body: B2) -> (Response<B2>, B) {
(Response(self.0, body), self.1) (Response(self.0, body), self.1)
@ -252,7 +257,7 @@ impl<B: MessageBody> Response<B> {
self.get_ref().response_size self.get_ref().response_size
} }
/// Set content encoding /// Set response size
pub(crate) fn set_response_size(&mut self, size: u64) { pub(crate) fn set_response_size(&mut self, size: u64) {
self.get_mut().response_size = size; self.get_mut().response_size = size;
} }
@ -266,7 +271,7 @@ impl<B: MessageBody> Response<B> {
} }
pub(crate) fn from_parts(parts: ResponseParts) -> Response { pub(crate) fn from_parts(parts: ResponseParts) -> Response {
Response(Box::new(InnerResponse::from_parts(parts)), ()) Response(Box::new(InnerResponse::from_parts(parts)), Body::Empty)
} }
} }
@ -279,7 +284,6 @@ impl fmt::Debug for Response {
self.get_ref().head.status, self.get_ref().head.status,
self.get_ref().head.reason.unwrap_or("") self.get_ref().head.reason.unwrap_or("")
); );
let _ = writeln!(f, " encoding: {:?}", self.get_ref().encoding);
let _ = writeln!(f, " headers:"); let _ = writeln!(f, " headers:");
for (key, val) in self.get_ref().head.headers.iter() { for (key, val) in self.get_ref().head.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val); let _ = writeln!(f, " {:?}: {:?}", key, val);
@ -396,20 +400,6 @@ impl ResponseBuilder {
self self
} }
/// Set content encoding.
///
/// By default `ContentEncoding::Auto` is used, which automatically
/// negotiates content encoding based on request's `Accept-Encoding`
/// headers. To enforce specific encoding, use specific
/// ContentEncoding` value.
#[inline]
pub fn content_encoding(&mut self, enc: ContentEncoding) -> &mut Self {
if let Some(parts) = parts(&mut self.response, &self.err) {
parts.encoding = Some(enc);
}
self
}
/// Set connection type /// Set connection type
#[inline] #[inline]
#[doc(hidden)] #[doc(hidden)]
@ -558,7 +548,14 @@ impl ResponseBuilder {
/// Set a body and generate `Response`. /// Set a body and generate `Response`.
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn body<B: MessageBody>(&mut self, body: B) -> Response<B> { pub fn body<B: Into<Body>>(&mut self, body: B) -> Response {
self.message_body(body.into())
}
/// Set a body and generate `Response`.
///
/// `ResponseBuilder` can not be used after this call.
pub fn message_body<B: MessageBody>(&mut self, body: B) -> Response<B> {
let mut error = if let Some(e) = self.err.take() { let mut error = if let Some(e) = self.err.take() {
Some(Error::from(e)) Some(Error::from(e))
} else { } else {
@ -589,25 +586,25 @@ impl ResponseBuilder {
/// Set a streaming body and generate `Response`. /// Set a streaming body and generate `Response`.
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn streaming<S, E>(&mut self, stream: S) -> Response<impl MessageBody> pub fn streaming<S, E>(&mut self, stream: S) -> Response
where where
S: Stream<Item = Bytes, Error = E> + 'static, S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error>, E: Into<Error> + 'static,
{ {
self.body(MessageBodyStream::new(stream.map_err(|e| e.into()))) self.body(Body::from_message(BodyStream::new(stream)))
} }
/// Set a json body and generate `Response` /// Set a json body and generate `Response`
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn json<T: Serialize>(&mut self, value: T) -> Response<String> { pub fn json<T: Serialize>(&mut self, value: T) -> Response {
self.json2(&value) self.json2(&value)
} }
/// Set a json body and generate `Response` /// Set a json body and generate `Response`
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn json2<T: Serialize>(&mut self, value: &T) -> Response<String> { pub fn json2<T: Serialize>(&mut self, value: &T) -> Response {
match serde_json::to_string(value) { match serde_json::to_string(value) {
Ok(body) => { Ok(body) => {
let contains = if let Some(parts) = parts(&mut self.response, &self.err) let contains = if let Some(parts) = parts(&mut self.response, &self.err)
@ -620,12 +617,9 @@ impl ResponseBuilder {
self.header(header::CONTENT_TYPE, "application/json"); self.header(header::CONTENT_TYPE, "application/json");
} }
self.body(body) self.body(Body::from(body))
}
Err(e) => {
let mut res: Response = Error::from(e).into();
res.replace_body(String::new()).0
} }
Err(e) => Error::from(e).into(),
} }
} }
@ -633,8 +627,8 @@ impl ResponseBuilder {
/// Set an empty body and generate `Response` /// Set an empty body and generate `Response`
/// ///
/// `ResponseBuilder` can not be used after this call. /// `ResponseBuilder` can not be used after this call.
pub fn finish(&mut self) -> Response<()> { pub fn finish(&mut self) -> Response {
self.body(()) self.body(Body::Empty)
} }
/// This method construct new `ResponseBuilder` /// This method construct new `ResponseBuilder`
@ -675,7 +669,7 @@ impl From<ResponseBuilder> for Response {
} }
} }
impl From<&'static str> for Response<&'static str> { impl From<&'static str> for Response {
fn from(val: &'static str) -> Self { fn from(val: &'static str) -> Self {
Response::Ok() Response::Ok()
.content_type("text/plain; charset=utf-8") .content_type("text/plain; charset=utf-8")
@ -683,7 +677,7 @@ impl From<&'static str> for Response<&'static str> {
} }
} }
impl From<&'static [u8]> for Response<&'static [u8]> { impl From<&'static [u8]> for Response {
fn from(val: &'static [u8]) -> Self { fn from(val: &'static [u8]) -> Self {
Response::Ok() Response::Ok()
.content_type("application/octet-stream") .content_type("application/octet-stream")
@ -691,7 +685,7 @@ impl From<&'static [u8]> for Response<&'static [u8]> {
} }
} }
impl From<String> for Response<String> { impl From<String> for Response {
fn from(val: String) -> Self { fn from(val: String) -> Self {
Response::Ok() Response::Ok()
.content_type("text/plain; charset=utf-8") .content_type("text/plain; charset=utf-8")
@ -699,7 +693,15 @@ impl From<String> for Response<String> {
} }
} }
impl From<Bytes> for Response<Bytes> { impl<'a> From<&'a String> for Response {
fn from(val: &'a String) -> Self {
Response::Ok()
.content_type("text/plain; charset=utf-8")
.body(val)
}
}
impl From<Bytes> for Response {
fn from(val: Bytes) -> Self { fn from(val: Bytes) -> Self {
Response::Ok() Response::Ok()
.content_type("application/octet-stream") .content_type("application/octet-stream")
@ -707,7 +709,7 @@ impl From<Bytes> for Response<Bytes> {
} }
} }
impl From<BytesMut> for Response<BytesMut> { impl From<BytesMut> for Response {
fn from(val: BytesMut) -> Self { fn from(val: BytesMut) -> Self {
Response::Ok() Response::Ok()
.content_type("application/octet-stream") .content_type("application/octet-stream")
@ -717,7 +719,6 @@ impl From<BytesMut> for Response<BytesMut> {
struct InnerResponse { struct InnerResponse {
head: ResponseHead, head: ResponseHead,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
write_capacity: usize, write_capacity: usize,
response_size: u64, response_size: u64,
@ -727,7 +728,6 @@ struct InnerResponse {
pub(crate) struct ResponseParts { pub(crate) struct ResponseParts {
head: ResponseHead, head: ResponseHead,
encoding: Option<ContentEncoding>,
connection_type: Option<ConnectionType>, connection_type: Option<ConnectionType>,
error: Option<Error>, error: Option<Error>,
} }
@ -744,7 +744,6 @@ impl InnerResponse {
flags: MessageFlags::empty(), flags: MessageFlags::empty(),
}, },
pool, pool,
encoding: None,
connection_type: None, connection_type: None,
response_size: 0, response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE, write_capacity: MAX_WRITE_BUFFER_SIZE,
@ -756,7 +755,6 @@ impl InnerResponse {
fn into_parts(self) -> ResponseParts { fn into_parts(self) -> ResponseParts {
ResponseParts { ResponseParts {
head: self.head, head: self.head,
encoding: self.encoding,
connection_type: self.connection_type, connection_type: self.connection_type,
error: self.error, error: self.error,
} }
@ -765,7 +763,6 @@ impl InnerResponse {
fn from_parts(parts: ResponseParts) -> InnerResponse { fn from_parts(parts: ResponseParts) -> InnerResponse {
InnerResponse { InnerResponse {
head: parts.head, head: parts.head,
encoding: parts.encoding,
connection_type: parts.connection_type, connection_type: parts.connection_type,
response_size: 0, response_size: 0,
write_capacity: MAX_WRITE_BUFFER_SIZE, write_capacity: MAX_WRITE_BUFFER_SIZE,
@ -841,7 +838,6 @@ impl ResponsePool {
let mut p = inner.pool.0.borrow_mut(); let mut p = inner.pool.0.borrow_mut();
if p.len() < 128 { if p.len() < 128 {
inner.head.clear(); inner.head.clear();
inner.encoding = None;
inner.connection_type = None; inner.connection_type = None;
inner.response_size = 0; inner.response_size = 0;
inner.error = None; inner.error = None;
@ -854,11 +850,10 @@ impl ResponsePool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use body::Binary; use body::Body;
use http; use http;
use http::header::{HeaderValue, CONTENT_TYPE, COOKIE}; use http::header::{HeaderValue, CONTENT_TYPE, COOKIE};
use header::ContentEncoding;
// use test::TestRequest; // use test::TestRequest;
#[test] #[test]
@ -953,24 +948,24 @@ mod tests {
assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain") assert_eq!(resp.headers().get(CONTENT_TYPE).unwrap(), "text/plain")
} }
#[test] // #[test]
fn test_content_encoding() { // fn test_content_encoding() {
let resp = Response::build(StatusCode::OK).finish(); // let resp = Response::build(StatusCode::OK).finish();
assert_eq!(resp.content_encoding(), None); // assert_eq!(resp.content_encoding(), None);
#[cfg(feature = "brotli")] // #[cfg(feature = "brotli")]
{ // {
let resp = Response::build(StatusCode::OK) // let resp = Response::build(StatusCode::OK)
.content_encoding(ContentEncoding::Br) // .content_encoding(ContentEncoding::Br)
.finish(); // .finish();
assert_eq!(resp.content_encoding(), Some(ContentEncoding::Br)); // assert_eq!(resp.content_encoding(), Some(ContentEncoding::Br));
} // }
let resp = Response::build(StatusCode::OK) // let resp = Response::build(StatusCode::OK)
.content_encoding(ContentEncoding::Gzip) // .content_encoding(ContentEncoding::Gzip)
.finish(); // .finish();
assert_eq!(resp.content_encoding(), Some(ContentEncoding::Gzip)); // assert_eq!(resp.content_encoding(), Some(ContentEncoding::Gzip));
} // }
#[test] #[test]
fn test_json() { fn test_json() {
@ -1020,15 +1015,6 @@ mod tests {
); );
} }
impl Body {
pub(crate) fn bin_ref(&self) -> &Binary {
match *self {
Body::Binary(ref bin) => bin,
_ => panic!(),
}
}
}
#[test] #[test]
fn test_into_response() { fn test_into_response() {
let resp: Response = "test".into(); let resp: Response = "test".into();
@ -1038,7 +1024,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from("test")); assert_eq!(resp.body().get_ref(), b"test");
let resp: Response = b"test".as_ref().into(); let resp: Response = b"test".as_ref().into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1047,7 +1033,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from(b"test".as_ref())); assert_eq!(resp.body().get_ref(), b"test");
let resp: Response = "test".to_owned().into(); let resp: Response = "test".to_owned().into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1056,7 +1042,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from("test".to_owned())); assert_eq!(resp.body().get_ref(), b"test");
let resp: Response = (&"test".to_owned()).into(); let resp: Response = (&"test".to_owned()).into();
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
@ -1065,7 +1051,7 @@ mod tests {
HeaderValue::from_static("text/plain; charset=utf-8") HeaderValue::from_static("text/plain; charset=utf-8")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from(&"test".to_owned())); assert_eq!(resp.body().get_ref(), b"test");
let b = Bytes::from_static(b"test"); let b = Bytes::from_static(b"test");
let resp: Response = b.into(); let resp: Response = b.into();
@ -1075,10 +1061,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!( assert_eq!(resp.body().get_ref(), b"test");
resp.body().bin_ref(),
&Binary::from(Bytes::from_static(b"test"))
);
let b = Bytes::from_static(b"test"); let b = Bytes::from_static(b"test");
let resp: Response = b.into(); let resp: Response = b.into();
@ -1088,7 +1071,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); assert_eq!(resp.body().get_ref(), b"test");
let b = BytesMut::from("test"); let b = BytesMut::from("test");
let resp: Response = b.into(); let resp: Response = b.into();
@ -1098,7 +1081,7 @@ mod tests {
HeaderValue::from_static("application/octet-stream") HeaderValue::from_static("application/octet-stream")
); );
assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.body().bin_ref(), &Binary::from(BytesMut::from("test"))); assert_eq!(resp.body().get_ref(), b"test");
} }
#[test] #[test]

View File

@ -72,7 +72,7 @@ where
} }
pub struct SendErrorFut<T, R, E> { pub struct SendErrorFut<T, R, E> {
res: Option<Message<Response>>, res: Option<Message<Response<()>>>,
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
err: Option<E>, err: Option<E>,
_t: PhantomData<R>, _t: PhantomData<R>,
@ -188,7 +188,7 @@ where
} }
pub struct SendResponseFut<T, B> { pub struct SendResponseFut<T, B> {
res: Option<Message<Response>>, res: Option<Message<Response<()>>>,
body: Option<B>, body: Option<B>,
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
} }

View File

@ -4,6 +4,7 @@ use std::str::FromStr;
use actix::System; use actix::System;
use bytes::Bytes;
use cookie::Cookie; use cookie::Cookie;
use futures::Future; use futures::Future;
use http::header::HeaderName; use http::header::HeaderName;
@ -11,7 +12,6 @@ use http::{HeaderMap, HttpTryFrom, Method, Uri, Version};
use net2::TcpBuilder; use net2::TcpBuilder;
use tokio::runtime::current_thread::Runtime; use tokio::runtime::current_thread::Runtime;
use body::Binary;
use header::{Header, IntoHeaderValue}; use header::{Header, IntoHeaderValue};
use payload::Payload; use payload::Payload;
use request::Request; use request::Request;
@ -362,10 +362,9 @@ impl TestRequest {
} }
/// Set request payload /// Set request payload
pub fn set_payload<B: Into<Binary>>(mut self, data: B) -> Self { pub fn set_payload<B: Into<Bytes>>(mut self, data: B) -> Self {
let mut data = data.into();
let mut payload = Payload::empty(); let mut payload = Payload::empty();
payload.unread_data(data.take()); payload.unread_data(data.into());
self.payload = Some(payload); self.payload = Some(payload);
self self
} }

View File

@ -1,10 +1,9 @@
use bytes::BytesMut; use bytes::{Bytes, BytesMut};
use tokio_codec::{Decoder, Encoder}; use tokio_codec::{Decoder, Encoder};
use super::frame::Parser; use super::frame::Parser;
use super::proto::{CloseReason, OpCode}; use super::proto::{CloseReason, OpCode};
use super::ProtocolError; use super::ProtocolError;
use body::Binary;
/// `WebSocket` Message /// `WebSocket` Message
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -12,7 +11,7 @@ pub enum Message {
/// Text message /// Text message
Text(String), Text(String),
/// Binary message /// Binary message
Binary(Binary), Binary(Bytes),
/// Ping message /// Ping message
Ping(String), Ping(String),
/// Pong message /// Pong message

View File

@ -1,8 +1,7 @@
use byteorder::{ByteOrder, LittleEndian, NetworkEndian}; use byteorder::{ByteOrder, LittleEndian, NetworkEndian};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use rand; use rand;
use body::Binary;
use ws::mask::apply_mask; use ws::mask::apply_mask;
use ws::proto::{CloseCode, CloseReason, OpCode}; use ws::proto::{CloseCode, CloseReason, OpCode};
use ws::ProtocolError; use ws::ProtocolError;
@ -151,7 +150,7 @@ impl Parser {
} }
/// Generate binary representation /// Generate binary representation
pub fn write_message<B: Into<Binary>>( pub fn write_message<B: Into<Bytes>>(
dst: &mut BytesMut, dst: &mut BytesMut,
pl: B, pl: B,
op: OpCode, op: OpCode,

View File

@ -15,7 +15,7 @@ use bytes::Bytes;
use futures::future::{self, ok}; use futures::future::{self, ok};
use futures::stream::once; use futures::stream::once;
use actix_http::{h1, http, Body, KeepAlive, Request, Response}; use actix_http::{body, h1, http, Body, Error, KeepAlive, Request, Response};
#[test] #[test]
fn test_h1_v2() { fn test_h1_v2() {
@ -349,11 +349,9 @@ fn test_body_length() {
.bind("test", addr, move || { .bind("test", addr, move || {
h1::H1Service::new(|_| { h1::H1Service::new(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref()))); let body = once(Ok(Bytes::from_static(STR.as_ref())));
ok::<_, ()>( ok::<_, ()>(Response::Ok().body(Body::from_message(
Response::Ok() body::SizedStream::new(STR.len(), body),
.content_length(STR.len() as u64) )))
.body(Body::Streaming(Box::new(body))),
)
}).map(|_| ()) }).map(|_| ())
}).unwrap() }).unwrap()
.run() .run()
@ -379,12 +377,8 @@ fn test_body_chunked_explicit() {
Server::new() Server::new()
.bind("test", addr, move || { .bind("test", addr, move || {
h1::H1Service::new(|_| { h1::H1Service::new(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref()))); let body = once::<_, Error>(Ok(Bytes::from_static(STR.as_ref())));
ok::<_, ()>( ok::<_, ()>(Response::Ok().streaming(body))
Response::Ok()
.chunked()
.body(Body::Streaming(Box::new(body))),
)
}).map(|_| ()) }).map(|_| ())
}).unwrap() }).unwrap()
.run() .run()
@ -412,8 +406,8 @@ fn test_body_chunked_implicit() {
Server::new() Server::new()
.bind("test", addr, move || { .bind("test", addr, move || {
h1::H1Service::new(|_| { h1::H1Service::new(|_| {
let body = once(Ok(Bytes::from_static(STR.as_ref()))); let body = once::<_, Error>(Ok(Bytes::from_static(STR.as_ref())));
ok::<_, ()>(Response::Ok().body(Body::Streaming(Box::new(body)))) ok::<_, ()>(Response::Ok().streaming(body))
}).map(|_| ()) }).map(|_| ())
}).unwrap() }).unwrap()
.run() .run()

View File

@ -18,7 +18,7 @@ use bytes::{Bytes, BytesMut};
use futures::future::{lazy, ok, Either}; use futures::future::{lazy, ok, Either};
use futures::{Future, IntoFuture, Sink, Stream}; use futures::{Future, IntoFuture, Sink, Stream};
use actix_http::{h1, ws, ResponseError, ServiceConfig}; use actix_http::{h1, ws, ResponseError, SendResponse, ServiceConfig};
fn ws_service(req: ws::Frame) -> impl Future<Item = ws::Message, Error = io::Error> { fn ws_service(req: ws::Frame) -> impl Future<Item = ws::Message, Error = io::Error> {
match req { match req {
@ -55,20 +55,19 @@ fn test_simple() {
match ws::verify_handshake(&req) { match ws::verify_handshake(&req) {
Err(e) => { Err(e) => {
// validation failed // validation failed
let resp = e.error_response();
Either::A( Either::A(
framed SendResponse::send(framed, e.error_response())
.send(h1::Message::Item(resp))
.map_err(|_| ()) .map_err(|_| ())
.map(|_| ()), .map(|_| ()),
) )
} }
Ok(_) => Either::B( Ok(_) => {
// send response Either::B(
framed // send handshake response
.send(h1::Message::Item( SendResponse::send(
framed,
ws::handshake_response(&req).finish(), ws::handshake_response(&req).finish(),
)).map_err(|_| ()) ).map_err(|_| ())
.and_then(|framed| { .and_then(|framed| {
// start websocket service // start websocket service
let framed = let framed =
@ -76,7 +75,8 @@ fn test_simple() {
ws::Transport::with(framed, ws_service) ws::Transport::with(framed, ws_service)
.map_err(|_| ()) .map_err(|_| ())
}), }),
), )
}
} }
} else { } else {
panic!() panic!()