1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-30 18:34:36 +01:00

make payload generic

This commit is contained in:
Nikolay Kim 2019-02-12 11:07:42 -08:00
parent 32021532c3
commit a41459bf69
10 changed files with 127 additions and 123 deletions

View File

@ -4,10 +4,7 @@ use std::{fmt, mem};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use crate::error::{Error, PayloadError}; use crate::error::Error;
/// Type represent streaming payload
pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>;
#[derive(Debug, PartialEq, Copy, Clone)] #[derive(Debug, PartialEq, Copy, Clone)]
/// Different type of body /// Different type of body

View File

@ -9,10 +9,11 @@ use super::connection::{ConnectionLifetime, ConnectionType, IoConnection};
use super::error::{ConnectorError, SendRequestError}; use super::error::{ConnectorError, SendRequestError};
use super::pool::Acquired; use super::pool::Acquired;
use super::response::ClientResponse; use super::response::ClientResponse;
use crate::body::{BodyLength, MessageBody, PayloadStream}; use crate::body::{BodyLength, MessageBody};
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;
use crate::message::RequestHead; use crate::message::RequestHead;
use crate::payload::PayloadStream;
pub(crate) fn send_request<T, B>( pub(crate) fn send_request<T, B>(
io: T, io: T,
@ -57,7 +58,7 @@ where
release_connection(framed, force_close) release_connection(framed, force_close)
} }
_ => { _ => {
res.set_payload(Payload::stream(framed)); res.set_payload(Payload::stream(framed).into());
} }
} }
ok(res) ok(res)

View File

@ -10,7 +10,6 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD
use http::{request::Request, HttpTryFrom, Version}; use http::{request::Request, HttpTryFrom, Version};
use crate::body::{BodyLength, MessageBody}; use crate::body::{BodyLength, MessageBody};
use crate::h2::Payload;
use crate::message::{RequestHead, ResponseHead}; use crate::message::{RequestHead, ResponseHead};
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
@ -111,7 +110,7 @@ where
Ok(ClientResponse { Ok(ClientResponse {
head, head,
payload: RefCell::new(Some(Box::new(Payload::new(body)))), payload: RefCell::new(body.into()),
}) })
}) })
.from_err() .from_err()

View File

@ -1,20 +1,19 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt; use std::{fmt, mem};
use bytes::Bytes; use bytes::Bytes;
use futures::{Async, Poll, Stream}; use futures::{Poll, Stream};
use http::{HeaderMap, StatusCode, Version}; use http::{HeaderMap, StatusCode, Version};
use crate::body::PayloadStream;
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::httpmessage::HttpMessage; use crate::httpmessage::HttpMessage;
use crate::message::{Head, ResponseHead}; use crate::message::{Head, ResponseHead};
use crate::payload::{Payload, PayloadStream};
/// Client Response /// Client Response
#[derive(Default)]
pub struct ClientResponse { pub struct ClientResponse {
pub(crate) head: ResponseHead, pub(crate) head: ResponseHead,
pub(crate) payload: RefCell<Option<PayloadStream>>, pub(crate) payload: RefCell<Payload>,
} }
impl HttpMessage for ClientResponse { impl HttpMessage for ClientResponse {
@ -25,8 +24,8 @@ impl HttpMessage for ClientResponse {
} }
#[inline] #[inline]
fn payload(&self) -> Option<Self::Stream> { fn payload(&self) -> Payload<Self::Stream> {
self.payload.borrow_mut().take() mem::replace(&mut *self.payload.borrow_mut(), Payload::None)
} }
} }
@ -35,7 +34,7 @@ impl ClientResponse {
pub fn new() -> ClientResponse { pub fn new() -> ClientResponse {
ClientResponse { ClientResponse {
head: ResponseHead::default(), head: ResponseHead::default(),
payload: RefCell::new(None), payload: RefCell::new(Payload::None),
} }
} }
@ -80,8 +79,8 @@ impl ClientResponse {
} }
/// Set response payload /// Set response payload
pub fn set_payload(&mut self, payload: PayloadStream) { pub fn set_payload(&mut self, payload: Payload) {
*self.payload.get_mut() = Some(payload); *self.payload.get_mut() = payload;
} }
} }
@ -90,11 +89,7 @@ impl Stream for ClientResponse {
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(ref mut payload) = self.payload.get_mut() { self.payload.get_mut().poll()
payload.poll()
} else {
Ok(Async::Ready(None))
}
} }
} }

View File

@ -316,7 +316,7 @@ where
match self.framed.get_codec().message_type() { match self.framed.get_codec().message_type() {
MessageType::Payload => { MessageType::Payload => {
let (ps, pl) = Payload::create(false); let (ps, pl) = Payload::create(false);
req = req.set_payload(pl); req = req.set_payload(crate::Payload::H1(pl));
self.payload = Some(ps); self.payload = Some(ps);
} }
MessageType::Stream => { MessageType::Stream => {

View File

@ -12,7 +12,7 @@ use log::error;
use crate::body::MessageBody; use crate::body::MessageBody;
use crate::config::{KeepAlive, ServiceConfig}; use crate::config::{KeepAlive, ServiceConfig};
use crate::error::{DispatchError, ParseError}; use crate::error::{DispatchError, ParseError};
use crate::payload::Payload; use crate::payload::PayloadStream;
use crate::request::Request; use crate::request::Request;
use crate::response::Response; use crate::response::Response;
@ -29,7 +29,7 @@ pub struct H1Service<T, S, B> {
impl<T, S, B> H1Service<T, S, B> impl<T, S, B> H1Service<T, S, B>
where where
S: NewService<Request = Request<Payload>>, S: NewService<Request = Request<PayloadStream>>,
S::Error: Debug, S::Error: Debug,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
S::Service: 'static, S::Service: 'static,

View File

@ -1,3 +1,5 @@
use std::{mem, str};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use encoding::all::UTF_8; use encoding::all::UTF_8;
use encoding::label::encoding_from_whatwg_label; use encoding::label::encoding_from_whatwg_label;
@ -8,13 +10,13 @@ use http::{header, HeaderMap};
use mime::Mime; use mime::Mime;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde_urlencoded; use serde_urlencoded;
use std::str;
use crate::error::{ use crate::error::{
ContentTypeError, ParseError, PayloadError, ReadlinesError, UrlencodedError, ContentTypeError, ParseError, PayloadError, ReadlinesError, UrlencodedError,
}; };
use crate::header::Header; use crate::header::Header;
use crate::json::JsonBody; use crate::json::JsonBody;
use crate::payload::Payload;
/// Trait that implements general purpose operations on http messages /// Trait that implements general purpose operations on http messages
pub trait HttpMessage: Sized { pub trait HttpMessage: Sized {
@ -25,7 +27,7 @@ pub trait HttpMessage: Sized {
fn headers(&self) -> &HeaderMap; fn headers(&self) -> &HeaderMap;
/// Message payload stream /// Message payload stream
fn payload(&self) -> Option<Self::Stream>; fn payload(&self) -> Payload<Self::Stream>;
#[doc(hidden)] #[doc(hidden)]
/// Get a header /// Get a header
@ -210,7 +212,7 @@ pub trait HttpMessage: Sized {
/// Stream to read request line by line. /// Stream to read request line by line.
pub struct Readlines<T: HttpMessage> { pub struct Readlines<T: HttpMessage> {
stream: Option<T::Stream>, stream: Payload<T::Stream>,
buff: BytesMut, buff: BytesMut,
limit: usize, limit: usize,
checked_buff: bool, checked_buff: bool,
@ -244,7 +246,7 @@ impl<T: HttpMessage> Readlines<T> {
fn err(err: ReadlinesError) -> Self { fn err(err: ReadlinesError) -> Self {
Readlines { Readlines {
stream: None, stream: Payload::None,
buff: BytesMut::new(), buff: BytesMut::new(),
limit: 262_144, limit: 262_144,
checked_buff: true, checked_buff: true,
@ -292,65 +294,61 @@ impl<T: HttpMessage + 'static> Stream for Readlines<T> {
self.checked_buff = true; self.checked_buff = true;
} }
// poll req for more bytes // poll req for more bytes
if let Some(ref mut stream) = self.stream { match self.stream.poll() {
match stream.poll() { Ok(Async::Ready(Some(mut bytes))) => {
Ok(Async::Ready(Some(mut bytes))) => { // check if there is a newline in bytes
// check if there is a newline in bytes let mut found: Option<usize> = None;
let mut found: Option<usize> = None; for (ind, b) in bytes.iter().enumerate() {
for (ind, b) in bytes.iter().enumerate() { if *b == b'\n' {
if *b == b'\n' { found = Some(ind);
found = Some(ind); break;
break;
}
} }
if let Some(ind) = found {
// check if line is longer than limit
if ind + 1 > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&bytes.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
// extend buffer with rest of the bytes;
self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
}
self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
} }
Ok(Async::NotReady) => Ok(Async::NotReady), if let Some(ind) = found {
Ok(Async::Ready(None)) => { // check if line is longer than limit
if self.buff.is_empty() { if ind + 1 > self.limit {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow); return Err(ReadlinesError::LimitOverflow);
} }
let enc: *const Encoding = self.encoding as *const Encoding; let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 { let line = if enc == UTF_8 {
str::from_utf8(&self.buff) str::from_utf8(&bytes.split_to(ind + 1))
.map_err(|_| ReadlinesError::EncodingError)? .map_err(|_| ReadlinesError::EncodingError)?
.to_owned() .to_owned()
} else { } else {
self.encoding self.encoding
.decode(&self.buff, DecoderTrap::Strict) .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)? .map_err(|_| ReadlinesError::EncodingError)?
}; };
self.buff.clear(); // extend buffer with rest of the bytes;
Ok(Async::Ready(Some(line))) self.buff.extend_from_slice(&bytes);
self.checked_buff = false;
return Ok(Async::Ready(Some(line)));
} }
Err(e) => Err(ReadlinesError::from(e)), self.buff.extend_from_slice(&bytes);
Ok(Async::NotReady)
} }
} else { Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) Ok(Async::Ready(None)) => {
if self.buff.is_empty() {
return Ok(Async::Ready(None));
}
if self.buff.len() > self.limit {
return Err(ReadlinesError::LimitOverflow);
}
let enc: *const Encoding = self.encoding as *const Encoding;
let line = if enc == UTF_8 {
str::from_utf8(&self.buff)
.map_err(|_| ReadlinesError::EncodingError)?
.to_owned()
} else {
self.encoding
.decode(&self.buff, DecoderTrap::Strict)
.map_err(|_| ReadlinesError::EncodingError)?
};
self.buff.clear();
Ok(Async::Ready(Some(line)))
}
Err(e) => Err(ReadlinesError::from(e)),
} }
} }
} }
@ -359,7 +357,7 @@ impl<T: HttpMessage + 'static> Stream for Readlines<T> {
pub struct MessageBody<T: HttpMessage> { pub struct MessageBody<T: HttpMessage> {
limit: usize, limit: usize,
length: Option<usize>, length: Option<usize>,
stream: Option<T::Stream>, stream: Payload<T::Stream>,
err: Option<PayloadError>, err: Option<PayloadError>,
fut: Option<Box<Future<Item = Bytes, Error = PayloadError>>>, fut: Option<Box<Future<Item = Bytes, Error = PayloadError>>>,
} }
@ -397,7 +395,7 @@ impl<T: HttpMessage> MessageBody<T> {
fn err(e: PayloadError) -> Self { fn err(e: PayloadError) -> Self {
MessageBody { MessageBody {
stream: None, stream: Payload::None,
limit: 262_144, limit: 262_144,
fut: None, fut: None,
err: Some(e), err: Some(e),
@ -428,16 +426,10 @@ where
} }
} }
if self.stream.is_none() {
return Ok(Async::Ready(Bytes::new()));
}
// future // future
let limit = self.limit; let limit = self.limit;
self.fut = Some(Box::new( self.fut = Some(Box::new(
self.stream mem::replace(&mut self.stream, Payload::None)
.take()
.expect("Can not be used second time")
.from_err() .from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| { .fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {
@ -455,7 +447,7 @@ where
/// Future that resolves to a parsed urlencoded values. /// Future that resolves to a parsed urlencoded values.
pub struct UrlEncoded<T: HttpMessage, U> { pub struct UrlEncoded<T: HttpMessage, U> {
stream: Option<T::Stream>, stream: Payload<T::Stream>,
limit: usize, limit: usize,
length: Option<usize>, length: Option<usize>,
encoding: EncodingRef, encoding: EncodingRef,
@ -500,7 +492,7 @@ impl<T: HttpMessage, U> UrlEncoded<T, U> {
fn err(e: UrlencodedError) -> Self { fn err(e: UrlencodedError) -> Self {
UrlEncoded { UrlEncoded {
stream: None, stream: Payload::None,
limit: 262_144, limit: 262_144,
fut: None, fut: None,
err: Some(e), err: Some(e),
@ -543,10 +535,7 @@ where
// future // future
let encoding = self.encoding; let encoding = self.encoding;
let fut = self let fut = mem::replace(&mut self.stream, Payload::None)
.stream
.take()
.expect("UrlEncoded could not be used second time")
.from_err() .from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| { .fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {

View File

@ -8,6 +8,7 @@ use serde_json;
use crate::error::JsonPayloadError; use crate::error::JsonPayloadError;
use crate::httpmessage::HttpMessage; use crate::httpmessage::HttpMessage;
use crate::payload::Payload;
/// Request payload json parser that resolves to a deserialized `T` value. /// Request payload json parser that resolves to a deserialized `T` value.
/// ///
@ -43,7 +44,7 @@ use crate::httpmessage::HttpMessage;
pub struct JsonBody<T: HttpMessage, U: DeserializeOwned> { pub struct JsonBody<T: HttpMessage, U: DeserializeOwned> {
limit: usize, limit: usize,
length: Option<usize>, length: Option<usize>,
stream: Option<T::Stream>, stream: Payload<T::Stream>,
err: Option<JsonPayloadError>, err: Option<JsonPayloadError>,
fut: Option<Box<Future<Item = U, Error = JsonPayloadError>>>, fut: Option<Box<Future<Item = U, Error = JsonPayloadError>>>,
} }
@ -61,7 +62,7 @@ impl<T: HttpMessage, U: DeserializeOwned> JsonBody<T, U> {
return JsonBody { return JsonBody {
limit: 262_144, limit: 262_144,
length: None, length: None,
stream: None, stream: Payload::None,
fut: None, fut: None,
err: Some(JsonPayloadError::ContentType), err: Some(JsonPayloadError::ContentType),
}; };
@ -112,10 +113,7 @@ impl<T: HttpMessage + 'static, U: DeserializeOwned + 'static> Future for JsonBod
} }
} }
let fut = self let fut = std::mem::replace(&mut self.stream, Payload::None)
.stream
.take()
.expect("JsonBody could not be used second time")
.from_err() .from_err()
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| { .fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {

View File

@ -1,32 +1,57 @@
use bytes::Bytes; use bytes::Bytes;
use derive_more::From; use futures::{Async, Poll, Stream};
use futures::{Poll, Stream};
use h2::RecvStream; use h2::RecvStream;
use crate::error::PayloadError; use crate::error::PayloadError;
#[derive(From)] /// Type represent boxed payload
pub enum Payload { pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>;
/// Type represent streaming payload
pub enum Payload<S = PayloadStream> {
None,
H1(crate::h1::Payload), H1(crate::h1::Payload),
H2(crate::h2::Payload), H2(crate::h2::Payload),
Dyn(Box<Stream<Item = Bytes, Error = PayloadError>>), Stream(S),
} }
impl From<RecvStream> for Payload { impl<S> From<RecvStream> for Payload<S> {
fn from(v: RecvStream) -> Self { fn from(v: RecvStream) -> Self {
Payload::H2(crate::h2::Payload::new(v)) Payload::H2(crate::h2::Payload::new(v))
} }
} }
impl Stream for Payload { impl<S> From<crate::h1::Payload> for Payload<S> {
fn from(pl: crate::h1::Payload) -> Self {
Payload::H1(pl)
}
}
impl<S> From<crate::h2::Payload> for Payload<S> {
fn from(pl: crate::h2::Payload) -> Self {
Payload::H2(pl)
}
}
impl From<PayloadStream> for Payload {
fn from(pl: PayloadStream) -> Self {
Payload::Stream(pl)
}
}
impl<S> Stream for Payload<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
type Item = Bytes; type Item = Bytes;
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self { match self {
Payload::None => Ok(Async::Ready(None)),
Payload::H1(ref mut pl) => pl.poll(), Payload::H1(ref mut pl) => pl.poll(),
Payload::H2(ref mut pl) => pl.poll(), Payload::H2(ref mut pl) => pl.poll(),
Payload::Dyn(ref mut pl) => pl.poll(), Payload::Stream(ref mut pl) => pl.poll(),
} }
} }
} }

View File

@ -1,5 +1,5 @@
use std::cell::{Ref, RefCell, RefMut}; use std::cell::{Ref, RefCell, RefMut};
use std::fmt; use std::{fmt, mem};
use bytes::Bytes; use bytes::Bytes;
use futures::Stream; use futures::Stream;
@ -9,11 +9,11 @@ use crate::error::PayloadError;
use crate::extensions::Extensions; use crate::extensions::Extensions;
use crate::httpmessage::HttpMessage; use crate::httpmessage::HttpMessage;
use crate::message::{Message, RequestHead}; use crate::message::{Message, RequestHead};
use crate::payload::Payload; use crate::payload::{Payload, PayloadStream};
/// Request /// Request
pub struct Request<P = Payload> { pub struct Request<P = PayloadStream> {
pub(crate) payload: RefCell<Option<P>>, pub(crate) payload: RefCell<Payload<P>>,
pub(crate) inner: Message<RequestHead>, pub(crate) inner: Message<RequestHead>,
} }
@ -28,53 +28,53 @@ where
} }
#[inline] #[inline]
fn payload(&self) -> Option<P> { fn payload(&self) -> Payload<Self::Stream> {
self.payload.borrow_mut().take() mem::replace(&mut *self.payload.borrow_mut(), Payload::None)
} }
} }
impl<Payload> From<Message<RequestHead>> for Request<Payload> { impl<P> From<Message<RequestHead>> for Request<P> {
fn from(msg: Message<RequestHead>) -> Self { fn from(msg: Message<RequestHead>) -> Self {
Request { Request {
payload: RefCell::new(None), payload: RefCell::new(Payload::None),
inner: msg, inner: msg,
} }
} }
} }
impl Request<Payload> { impl Request<PayloadStream> {
/// Create new Request instance /// Create new Request instance
pub fn new() -> Request<Payload> { pub fn new() -> Request<PayloadStream> {
Request { Request {
payload: RefCell::new(None), payload: RefCell::new(Payload::None),
inner: Message::new(), inner: Message::new(),
} }
} }
} }
impl<Payload> Request<Payload> { impl<P> Request<P> {
/// Create new Request instance /// Create new Request instance
pub fn with_payload(payload: Payload) -> Request<Payload> { pub fn with_payload(payload: Payload<P>) -> Request<P> {
Request { Request {
payload: RefCell::new(Some(payload.into())), payload: RefCell::new(payload),
inner: Message::new(), inner: Message::new(),
} }
} }
/// Create new Request instance /// Create new Request instance
pub fn set_payload<I, P>(self, payload: I) -> Request<P> pub fn set_payload<I, P1>(self, payload: I) -> Request<P1>
where where
I: Into<P>, I: Into<Payload<P1>>,
{ {
Request { Request {
payload: RefCell::new(Some(payload.into())), payload: RefCell::new(payload.into()),
inner: self.inner, inner: self.inner,
} }
} }
/// Split request into request head and payload /// Split request into request head and payload
pub fn into_parts(mut self) -> (Message<RequestHead>, Option<Payload>) { pub fn into_parts(self) -> (Message<RequestHead>, Payload<P>) {
(self.inner, self.payload.get_mut().take()) (self.inner, self.payload.into_inner())
} }
#[inline] #[inline]