From a41459bf698ca13c56822da1fcf7137c1a452931 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 12 Feb 2019 11:07:42 -0800 Subject: [PATCH] make payload generic --- src/body.rs | 5 +- src/client/h1proto.rs | 5 +- src/client/h2proto.rs | 3 +- src/client/response.rs | 25 ++++----- src/h1/dispatcher.rs | 2 +- src/h1/service.rs | 4 +- src/httpmessage.rs | 117 +++++++++++++++++++---------------------- src/json.rs | 10 ++-- src/payload.rs | 41 ++++++++++++--- src/request.rs | 38 ++++++------- 10 files changed, 127 insertions(+), 123 deletions(-) diff --git a/src/body.rs b/src/body.rs index 1c54d4ce..d3e63f9c 100644 --- a/src/body.rs +++ b/src/body.rs @@ -4,10 +4,7 @@ use std::{fmt, mem}; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; -use crate::error::{Error, PayloadError}; - -/// Type represent streaming payload -pub type PayloadStream = Box>; +use crate::error::Error; #[derive(Debug, PartialEq, Copy, Clone)] /// Different type of body diff --git a/src/client/h1proto.rs b/src/client/h1proto.rs index 59a03ef4..d1649138 100644 --- a/src/client/h1proto.rs +++ b/src/client/h1proto.rs @@ -9,10 +9,11 @@ use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::error::{ConnectorError, SendRequestError}; use super::pool::Acquired; use super::response::ClientResponse; -use crate::body::{BodyLength, MessageBody, PayloadStream}; +use crate::body::{BodyLength, MessageBody}; use crate::error::PayloadError; use crate::h1; use crate::message::RequestHead; +use crate::payload::PayloadStream; pub(crate) fn send_request( io: T, @@ -57,7 +58,7 @@ where release_connection(framed, force_close) } _ => { - res.set_payload(Payload::stream(framed)); + res.set_payload(Payload::stream(framed).into()); } } ok(res) diff --git a/src/client/h2proto.rs b/src/client/h2proto.rs index ecd18cf8..697d30a4 100644 --- a/src/client/h2proto.rs +++ b/src/client/h2proto.rs @@ -10,7 +10,6 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCOD use http::{request::Request, HttpTryFrom, Version}; use crate::body::{BodyLength, MessageBody}; -use crate::h2::Payload; use crate::message::{RequestHead, ResponseHead}; use super::connection::{ConnectionType, IoConnection}; @@ -111,7 +110,7 @@ where Ok(ClientResponse { head, - payload: RefCell::new(Some(Box::new(Payload::new(body)))), + payload: RefCell::new(body.into()), }) }) .from_err() diff --git a/src/client/response.rs b/src/client/response.rs index 7a83d825..f19e2d17 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,20 +1,19 @@ use std::cell::RefCell; -use std::fmt; +use std::{fmt, mem}; use bytes::Bytes; -use futures::{Async, Poll, Stream}; +use futures::{Poll, Stream}; use http::{HeaderMap, StatusCode, Version}; -use crate::body::PayloadStream; use crate::error::PayloadError; use crate::httpmessage::HttpMessage; use crate::message::{Head, ResponseHead}; +use crate::payload::{Payload, PayloadStream}; /// Client Response -#[derive(Default)] pub struct ClientResponse { pub(crate) head: ResponseHead, - pub(crate) payload: RefCell>, + pub(crate) payload: RefCell, } impl HttpMessage for ClientResponse { @@ -25,8 +24,8 @@ impl HttpMessage for ClientResponse { } #[inline] - fn payload(&self) -> Option { - self.payload.borrow_mut().take() + fn payload(&self) -> Payload { + mem::replace(&mut *self.payload.borrow_mut(), Payload::None) } } @@ -35,7 +34,7 @@ impl ClientResponse { pub fn new() -> ClientResponse { ClientResponse { head: ResponseHead::default(), - payload: RefCell::new(None), + payload: RefCell::new(Payload::None), } } @@ -80,8 +79,8 @@ impl ClientResponse { } /// Set response payload - pub fn set_payload(&mut self, payload: PayloadStream) { - *self.payload.get_mut() = Some(payload); + pub fn set_payload(&mut self, payload: Payload) { + *self.payload.get_mut() = payload; } } @@ -90,11 +89,7 @@ impl Stream for ClientResponse { type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { - if let Some(ref mut payload) = self.payload.get_mut() { - payload.poll() - } else { - Ok(Async::Ready(None)) - } + self.payload.get_mut().poll() } } diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 6a176274..c242333b 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -316,7 +316,7 @@ where match self.framed.get_codec().message_type() { MessageType::Payload => { let (ps, pl) = Payload::create(false); - req = req.set_payload(pl); + req = req.set_payload(crate::Payload::H1(pl)); self.payload = Some(ps); } MessageType::Stream => { diff --git a/src/h1/service.rs b/src/h1/service.rs index 169a80eb..38186449 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -12,7 +12,7 @@ use log::error; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; use crate::error::{DispatchError, ParseError}; -use crate::payload::Payload; +use crate::payload::PayloadStream; use crate::request::Request; use crate::response::Response; @@ -29,7 +29,7 @@ pub struct H1Service { impl H1Service where - S: NewService>, + S: NewService>, S::Error: Debug, S::Response: Into>, S::Service: 'static, diff --git a/src/httpmessage.rs b/src/httpmessage.rs index f071cd7b..57ad4bf9 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -1,3 +1,5 @@ +use std::{mem, str}; + use bytes::{Bytes, BytesMut}; use encoding::all::UTF_8; use encoding::label::encoding_from_whatwg_label; @@ -8,13 +10,13 @@ use http::{header, HeaderMap}; use mime::Mime; use serde::de::DeserializeOwned; use serde_urlencoded; -use std::str; use crate::error::{ ContentTypeError, ParseError, PayloadError, ReadlinesError, UrlencodedError, }; use crate::header::Header; use crate::json::JsonBody; +use crate::payload::Payload; /// Trait that implements general purpose operations on http messages pub trait HttpMessage: Sized { @@ -25,7 +27,7 @@ pub trait HttpMessage: Sized { fn headers(&self) -> &HeaderMap; /// Message payload stream - fn payload(&self) -> Option; + fn payload(&self) -> Payload; #[doc(hidden)] /// Get a header @@ -210,7 +212,7 @@ pub trait HttpMessage: Sized { /// Stream to read request line by line. pub struct Readlines { - stream: Option, + stream: Payload, buff: BytesMut, limit: usize, checked_buff: bool, @@ -244,7 +246,7 @@ impl Readlines { fn err(err: ReadlinesError) -> Self { Readlines { - stream: None, + stream: Payload::None, buff: BytesMut::new(), limit: 262_144, checked_buff: true, @@ -292,65 +294,61 @@ impl Stream for Readlines { self.checked_buff = true; } // poll req for more bytes - if let Some(ref mut stream) = self.stream { - match stream.poll() { - Ok(Async::Ready(Some(mut bytes))) => { - // check if there is a newline in bytes - let mut found: Option = None; - for (ind, b) in bytes.iter().enumerate() { - if *b == b'\n' { - found = Some(ind); - break; - } + match self.stream.poll() { + Ok(Async::Ready(Some(mut bytes))) => { + // check if there is a newline in bytes + let mut found: Option = None; + for (ind, b) in bytes.iter().enumerate() { + if *b == b'\n' { + found = Some(ind); + break; } - if let Some(ind) = found { - // check if line is longer than limit - if ind + 1 > self.limit { - return Err(ReadlinesError::LimitOverflow); - } - let enc: *const Encoding = self.encoding as *const Encoding; - let line = if enc == UTF_8 { - str::from_utf8(&bytes.split_to(ind + 1)) - .map_err(|_| ReadlinesError::EncodingError)? - .to_owned() - } else { - self.encoding - .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) - .map_err(|_| ReadlinesError::EncodingError)? - }; - // extend buffer with rest of the bytes; - self.buff.extend_from_slice(&bytes); - self.checked_buff = false; - return Ok(Async::Ready(Some(line))); - } - self.buff.extend_from_slice(&bytes); - Ok(Async::NotReady) } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => { - if self.buff.is_empty() { - return Ok(Async::Ready(None)); - } - if self.buff.len() > self.limit { + if let Some(ind) = found { + // check if line is longer than limit + if ind + 1 > self.limit { return Err(ReadlinesError::LimitOverflow); } let enc: *const Encoding = self.encoding as *const Encoding; let line = if enc == UTF_8 { - str::from_utf8(&self.buff) + str::from_utf8(&bytes.split_to(ind + 1)) .map_err(|_| ReadlinesError::EncodingError)? .to_owned() } else { self.encoding - .decode(&self.buff, DecoderTrap::Strict) + .decode(&bytes.split_to(ind + 1), DecoderTrap::Strict) .map_err(|_| ReadlinesError::EncodingError)? }; - self.buff.clear(); - Ok(Async::Ready(Some(line))) + // extend buffer with rest of the bytes; + self.buff.extend_from_slice(&bytes); + self.checked_buff = false; + return Ok(Async::Ready(Some(line))); } - Err(e) => Err(ReadlinesError::from(e)), + self.buff.extend_from_slice(&bytes); + Ok(Async::NotReady) } - } else { - Ok(Async::Ready(None)) + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if self.buff.is_empty() { + return Ok(Async::Ready(None)); + } + if self.buff.len() > self.limit { + return Err(ReadlinesError::LimitOverflow); + } + let enc: *const Encoding = self.encoding as *const Encoding; + let line = if enc == UTF_8 { + str::from_utf8(&self.buff) + .map_err(|_| ReadlinesError::EncodingError)? + .to_owned() + } else { + self.encoding + .decode(&self.buff, DecoderTrap::Strict) + .map_err(|_| ReadlinesError::EncodingError)? + }; + self.buff.clear(); + Ok(Async::Ready(Some(line))) + } + Err(e) => Err(ReadlinesError::from(e)), } } } @@ -359,7 +357,7 @@ impl Stream for Readlines { pub struct MessageBody { limit: usize, length: Option, - stream: Option, + stream: Payload, err: Option, fut: Option>>, } @@ -397,7 +395,7 @@ impl MessageBody { fn err(e: PayloadError) -> Self { MessageBody { - stream: None, + stream: Payload::None, limit: 262_144, fut: None, err: Some(e), @@ -428,16 +426,10 @@ where } } - if self.stream.is_none() { - return Ok(Async::Ready(Bytes::new())); - } - // future let limit = self.limit; self.fut = Some(Box::new( - self.stream - .take() - .expect("Can not be used second time") + mem::replace(&mut self.stream, Payload::None) .from_err() .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { @@ -455,7 +447,7 @@ where /// Future that resolves to a parsed urlencoded values. pub struct UrlEncoded { - stream: Option, + stream: Payload, limit: usize, length: Option, encoding: EncodingRef, @@ -500,7 +492,7 @@ impl UrlEncoded { fn err(e: UrlencodedError) -> Self { UrlEncoded { - stream: None, + stream: Payload::None, limit: 262_144, fut: None, err: Some(e), @@ -543,10 +535,7 @@ where // future let encoding = self.encoding; - let fut = self - .stream - .take() - .expect("UrlEncoded could not be used second time") + let fut = mem::replace(&mut self.stream, Payload::None) .from_err() .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { diff --git a/src/json.rs b/src/json.rs index f750f545..573fde41 100644 --- a/src/json.rs +++ b/src/json.rs @@ -8,6 +8,7 @@ use serde_json; use crate::error::JsonPayloadError; use crate::httpmessage::HttpMessage; +use crate::payload::Payload; /// Request payload json parser that resolves to a deserialized `T` value. /// @@ -43,7 +44,7 @@ use crate::httpmessage::HttpMessage; pub struct JsonBody { limit: usize, length: Option, - stream: Option, + stream: Payload, err: Option, fut: Option>>, } @@ -61,7 +62,7 @@ impl JsonBody { return JsonBody { limit: 262_144, length: None, - stream: None, + stream: Payload::None, fut: None, err: Some(JsonPayloadError::ContentType), }; @@ -112,10 +113,7 @@ impl Future for JsonBod } } - let fut = self - .stream - .take() - .expect("JsonBody could not be used second time") + let fut = std::mem::replace(&mut self.stream, Payload::None) .from_err() .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { diff --git a/src/payload.rs b/src/payload.rs index ede1281e..21e41531 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,32 +1,57 @@ use bytes::Bytes; -use derive_more::From; -use futures::{Poll, Stream}; +use futures::{Async, Poll, Stream}; use h2::RecvStream; use crate::error::PayloadError; -#[derive(From)] -pub enum Payload { +/// Type represent boxed payload +pub type PayloadStream = Box>; + +/// Type represent streaming payload +pub enum Payload { + None, H1(crate::h1::Payload), H2(crate::h2::Payload), - Dyn(Box>), + Stream(S), } -impl From for Payload { +impl From for Payload { fn from(v: RecvStream) -> Self { Payload::H2(crate::h2::Payload::new(v)) } } -impl Stream for Payload { +impl From for Payload { + fn from(pl: crate::h1::Payload) -> Self { + Payload::H1(pl) + } +} + +impl From for Payload { + fn from(pl: crate::h2::Payload) -> Self { + Payload::H2(pl) + } +} + +impl From for Payload { + fn from(pl: PayloadStream) -> Self { + Payload::Stream(pl) + } +} + +impl Stream for Payload +where + S: Stream, +{ type Item = Bytes; type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { match self { + Payload::None => Ok(Async::Ready(None)), Payload::H1(ref mut pl) => pl.poll(), Payload::H2(ref mut pl) => pl.poll(), - Payload::Dyn(ref mut pl) => pl.poll(), + Payload::Stream(ref mut pl) => pl.poll(), } } } diff --git a/src/request.rs b/src/request.rs index b9c35c7a..388fe754 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,5 +1,5 @@ use std::cell::{Ref, RefCell, RefMut}; -use std::fmt; +use std::{fmt, mem}; use bytes::Bytes; use futures::Stream; @@ -9,11 +9,11 @@ use crate::error::PayloadError; use crate::extensions::Extensions; use crate::httpmessage::HttpMessage; use crate::message::{Message, RequestHead}; -use crate::payload::Payload; +use crate::payload::{Payload, PayloadStream}; /// Request -pub struct Request

{ - pub(crate) payload: RefCell>, +pub struct Request

{ + pub(crate) payload: RefCell>, pub(crate) inner: Message, } @@ -28,53 +28,53 @@ where } #[inline] - fn payload(&self) -> Option

{ - self.payload.borrow_mut().take() + fn payload(&self) -> Payload { + mem::replace(&mut *self.payload.borrow_mut(), Payload::None) } } -impl From> for Request { +impl

From> for Request

{ fn from(msg: Message) -> Self { Request { - payload: RefCell::new(None), + payload: RefCell::new(Payload::None), inner: msg, } } } -impl Request { +impl Request { /// Create new Request instance - pub fn new() -> Request { + pub fn new() -> Request { Request { - payload: RefCell::new(None), + payload: RefCell::new(Payload::None), inner: Message::new(), } } } -impl Request { +impl

Request

{ /// Create new Request instance - pub fn with_payload(payload: Payload) -> Request { + pub fn with_payload(payload: Payload

) -> Request

{ Request { - payload: RefCell::new(Some(payload.into())), + payload: RefCell::new(payload), inner: Message::new(), } } /// Create new Request instance - pub fn set_payload(self, payload: I) -> Request

+ pub fn set_payload(self, payload: I) -> Request where - I: Into

, + I: Into>, { Request { - payload: RefCell::new(Some(payload.into())), + payload: RefCell::new(payload.into()), inner: self.inner, } } /// Split request into request head and payload - pub fn into_parts(mut self) -> (Message, Option) { - (self.inner, self.payload.get_mut().take()) + pub fn into_parts(self) -> (Message, Payload

) { + (self.inner, self.payload.into_inner()) } #[inline]