From 141b992450da0e2447319982dbf632d94baeefed Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 25 Feb 2018 11:21:45 +0300 Subject: [PATCH] Make payload and httprequest a stream --- src/httprequest.rs | 15 ++++++-- src/json.rs | 5 ++- src/multipart.rs | 2 +- src/payload.rs | 88 +++++++++------------------------------------- src/ws/mod.rs | 10 +++--- 5 files changed, 37 insertions(+), 83 deletions(-) diff --git a/src/httprequest.rs b/src/httprequest.rs index aa0eb9f0e..e46a7c3ee 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -15,7 +15,7 @@ use http::{header, Uri, Method, Version, HeaderMap, Extensions}; use info::ConnectionInfo; use param::Params; use router::Router; -use payload::{Payload, ReadAny}; +use payload::Payload; use json::JsonBody; use multipart::Multipart; use helpers::SharedHttpMessage; @@ -604,6 +604,15 @@ impl Clone for HttpRequest { } } +impl Stream for HttpRequest { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, PayloadError> { + self.payload_mut().poll() + } +} + impl fmt::Debug for HttpRequest { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let res = write!(f, "\nHttpRequest {:?} {}:{}\n", @@ -706,7 +715,7 @@ impl Future for UrlEncoded { /// Future that resolves to a complete request body. pub struct RequestBody { - pl: ReadAny, + pl: Payload, body: BytesMut, limit: usize, req: Option>, @@ -716,7 +725,7 @@ impl RequestBody { /// Create `RequestBody` for request. pub fn from_request(req: &HttpRequest) -> RequestBody { - let pl = req.payload().readany(); + let pl = req.payload().clone(); RequestBody { pl: pl, body: BytesMut::new(), diff --git a/src/json.rs b/src/json.rs index 8bcda5c90..86e612048 100644 --- a/src/json.rs +++ b/src/json.rs @@ -111,7 +111,7 @@ impl JsonBody { } } -impl Future for JsonBody { +impl Future for JsonBody { type Item = T; type Error = JsonPayloadError; @@ -134,8 +134,7 @@ impl Future for JsonBody { } let limit = self.limit; - let fut = req.payload().readany() - .from_err() + let fut = req.from_err() .fold(BytesMut::new(), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { Err(JsonPayloadError::Overflow) diff --git a/src/multipart.rs b/src/multipart.rs index 9da15ba59..0fbc906d9 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -482,7 +482,7 @@ impl InnerField { if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.readany().poll() { + match payload.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(Some(mut chunk))) => { diff --git a/src/payload.rs b/src/payload.rs index 97e59a488..8d5bd7206 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,42 +1,16 @@ //! Payload stream -use std::{fmt, cmp}; +use std::cmp; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::VecDeque; -use std::ops::{Deref, DerefMut}; use bytes::{Bytes, BytesMut}; use futures::{Future, Async, Poll, Stream}; use futures::task::{Task, current as current_task}; -use body::BodyStream; use error::PayloadError; pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k -/// Just Bytes object -#[derive(PartialEq, Message)] -pub struct PayloadItem(pub Bytes); - -impl Deref for PayloadItem { - type Target = Bytes; - - fn deref(&self) -> &Bytes { - &self.0 - } -} - -impl DerefMut for PayloadItem { - fn deref_mut(&mut self) -> &mut Bytes { - &mut self.0 - } -} - -impl fmt::Debug for PayloadItem { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.0, f) - } -} - /// Buffered stream of bytes chunks /// /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. @@ -88,12 +62,6 @@ impl Payload { self.inner.borrow().len() == 0 } - /// Get first available chunk of data. - #[inline] - pub fn readany(&self) -> ReadAny { - ReadAny(Rc::clone(&self.inner)) - } - /// Get exact number of bytes #[inline] pub fn readexactly(&self, size: usize) -> ReadExactly { @@ -135,20 +103,14 @@ impl Payload { pub fn set_buffer_size(&self, size: usize) { self.inner.borrow_mut().set_buffer_size(size) } - - /// Convert payload into compatible `HttpResponse` body stream - #[inline] - pub fn stream(self) -> BodyStream { - Box::new(self.map(|i| i.0).map_err(|e| e.into())) - } } impl Stream for Payload { - type Item = PayloadItem; + type Item = Bytes; type Error = PayloadError; #[inline] - fn poll(&mut self) -> Poll, PayloadError> { + fn poll(&mut self) -> Poll, PayloadError> { self.inner.borrow_mut().readany(false) } } @@ -159,22 +121,6 @@ impl Clone for Payload { } } -/// Get first available chunk of data -pub struct ReadAny(Rc>); - -impl Stream for ReadAny { - type Item = Bytes; - type Error = PayloadError; - - fn poll(&mut self) -> Poll, Self::Error> { - match self.0.borrow_mut().readany(false)? { - Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), - Async::Ready(None) => Ok(Async::Ready(None)), - Async::NotReady => Ok(Async::NotReady), - } - } -} - /// Get exact number of bytes pub struct ReadExactly(Rc>, usize); @@ -325,10 +271,10 @@ impl Inner { self.len } - fn readany(&mut self, notify: bool) -> Poll, PayloadError> { + fn readany(&mut self, notify: bool) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); - Ok(Async::Ready(Some(PayloadItem(data)))) + Ok(Async::Ready(Some(data))) } else if let Some(err) = self.err.take() { Err(err) } else if self.eof { @@ -486,12 +432,12 @@ mod tests { #[test] fn test_basic() { Core::new().unwrap().run(lazy(|| { - let (_, payload) = Payload::new(false); + let (_, mut payload) = Payload::new(false); assert!(!payload.eof()); assert!(payload.is_empty()); assert_eq!(payload.len(), 0); - assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) @@ -501,9 +447,9 @@ mod tests { #[test] fn test_eof() { Core::new().unwrap().run(lazy(|| { - let (mut sender, payload) = Payload::new(false); + let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); assert!(!payload.eof()); sender.feed_data(Bytes::from("data")); @@ -512,12 +458,12 @@ mod tests { assert!(!payload.eof()); assert_eq!(Async::Ready(Some(Bytes::from("data"))), - payload.readany().poll().ok().unwrap()); + payload.poll().ok().unwrap()); assert!(payload.is_empty()); assert!(payload.eof()); assert_eq!(payload.len(), 0); - assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap()); + assert_eq!(Async::Ready(None), payload.poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -526,12 +472,12 @@ mod tests { #[test] fn test_err() { Core::new().unwrap().run(lazy(|| { - let (mut sender, payload) = Payload::new(false); + let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); + assert_eq!(Async::NotReady, payload.poll().ok().unwrap()); sender.set_error(PayloadError::Incomplete); - payload.readany().poll().err().unwrap(); + payload.poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -540,7 +486,7 @@ mod tests { #[test] fn test_readany() { Core::new().unwrap().run(lazy(|| { - let (mut sender, payload) = Payload::new(false); + let (mut sender, mut payload) = Payload::new(false); sender.feed_data(Bytes::from("line1")); @@ -552,7 +498,7 @@ mod tests { assert_eq!(payload.len(), 10); assert_eq!(Async::Ready(Some(Bytes::from("line1"))), - payload.readany().poll().ok().unwrap()); + payload.poll().ok().unwrap()); assert!(!payload.is_empty()); assert_eq!(payload.len(), 5); @@ -625,7 +571,7 @@ mod tests { assert_eq!(payload.len(), 4); assert_eq!(Async::Ready(Some(Bytes::from("data"))), - payload.readany().poll().ok().unwrap()); + payload.poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 91258cabe..93d0b61aa 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -50,7 +50,7 @@ use futures::{Async, Poll, Stream}; use actix::{Actor, AsyncContext, Handler}; use body::Binary; -use payload::ReadAny; +use payload::Payload; use error::{Error, WsHandshakeError}; use httprequest::HttpRequest; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; @@ -86,12 +86,12 @@ pub enum Message { } /// Do websocket handshake and start actor -pub fn start(mut req: HttpRequest, actor: A) -> Result +pub fn start(req: HttpRequest, actor: A) -> Result where A: Actor> + Handler, S: 'static { let mut resp = handshake(&req)?; - let stream = WsStream::new(req.payload_mut().readany()); + let stream = WsStream::new(req.payload().clone()); let mut ctx = WebsocketContext::new(req, actor); ctx.add_message_stream(stream); @@ -166,14 +166,14 @@ pub fn handshake(req: &HttpRequest) -> Result WsStream { + pub fn new(payload: Payload) -> WsStream { WsStream { rx: payload, buf: BytesMut::new(), closed: false,