1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

Make payload and httprequest a stream

This commit is contained in:
Nikolay Kim 2018-02-25 11:21:45 +03:00
parent 4e41e13baf
commit 141b992450
5 changed files with 37 additions and 83 deletions

View File

@ -15,7 +15,7 @@ use http::{header, Uri, Method, Version, HeaderMap, Extensions};
use info::ConnectionInfo; use info::ConnectionInfo;
use param::Params; use param::Params;
use router::Router; use router::Router;
use payload::{Payload, ReadAny}; use payload::Payload;
use json::JsonBody; use json::JsonBody;
use multipart::Multipart; use multipart::Multipart;
use helpers::SharedHttpMessage; use helpers::SharedHttpMessage;
@ -604,6 +604,15 @@ impl<S> Clone for HttpRequest<S> {
} }
} }
impl<S> Stream for HttpRequest<S> {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.payload_mut().poll()
}
}
impl<S> fmt::Debug for HttpRequest<S> { impl<S> fmt::Debug for HttpRequest<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = write!(f, "\nHttpRequest {:?} {}:{}\n", let res = write!(f, "\nHttpRequest {:?} {}:{}\n",
@ -706,7 +715,7 @@ impl Future for UrlEncoded {
/// Future that resolves to a complete request body. /// Future that resolves to a complete request body.
pub struct RequestBody { pub struct RequestBody {
pl: ReadAny, pl: Payload,
body: BytesMut, body: BytesMut,
limit: usize, limit: usize,
req: Option<HttpRequest<()>>, req: Option<HttpRequest<()>>,
@ -716,7 +725,7 @@ impl RequestBody {
/// Create `RequestBody` for request. /// Create `RequestBody` for request.
pub fn from_request<S>(req: &HttpRequest<S>) -> RequestBody { pub fn from_request<S>(req: &HttpRequest<S>) -> RequestBody {
let pl = req.payload().readany(); let pl = req.payload().clone();
RequestBody { RequestBody {
pl: pl, pl: pl,
body: BytesMut::new(), body: BytesMut::new(),

View File

@ -111,7 +111,7 @@ impl<S, T: DeserializeOwned> JsonBody<S, T> {
} }
} }
impl<S, T: DeserializeOwned + 'static> Future for JsonBody<S, T> { impl<S: 'static, T: DeserializeOwned + 'static> Future for JsonBody<S, T> {
type Item = T; type Item = T;
type Error = JsonPayloadError; type Error = JsonPayloadError;
@ -134,8 +134,7 @@ impl<S, T: DeserializeOwned + 'static> Future for JsonBody<S, T> {
} }
let limit = self.limit; let limit = self.limit;
let fut = req.payload().readany() let fut = req.from_err()
.from_err()
.fold(BytesMut::new(), move |mut body, chunk| { .fold(BytesMut::new(), move |mut body, chunk| {
if (body.len() + chunk.len()) > limit { if (body.len() + chunk.len()) > limit {
Err(JsonPayloadError::Overflow) Err(JsonPayloadError::Overflow)

View File

@ -482,7 +482,7 @@ impl InnerField {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.readany().poll() { match payload.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::Ready(Some(mut chunk))) => { Ok(Async::Ready(Some(mut chunk))) => {

View File

@ -1,42 +1,16 @@
//! Payload stream //! Payload stream
use std::{fmt, cmp}; use std::cmp;
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Future, Async, Poll, Stream}; use futures::{Future, Async, Poll, Stream};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use body::BodyStream;
use error::PayloadError; use error::PayloadError;
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
/// Just Bytes object
#[derive(PartialEq, Message)]
pub struct PayloadItem(pub Bytes);
impl Deref for PayloadItem {
type Target = Bytes;
fn deref(&self) -> &Bytes {
&self.0
}
}
impl DerefMut for PayloadItem {
fn deref_mut(&mut self) -> &mut Bytes {
&mut self.0
}
}
impl fmt::Debug for PayloadItem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
/// Buffered stream of bytes chunks /// Buffered stream of bytes chunks
/// ///
/// Payload stores chunks in a vector. First chunk can be received with `.readany()` method. /// Payload stores chunks in a vector. First chunk can be received with `.readany()` method.
@ -88,12 +62,6 @@ impl Payload {
self.inner.borrow().len() == 0 self.inner.borrow().len() == 0
} }
/// Get first available chunk of data.
#[inline]
pub fn readany(&self) -> ReadAny {
ReadAny(Rc::clone(&self.inner))
}
/// Get exact number of bytes /// Get exact number of bytes
#[inline] #[inline]
pub fn readexactly(&self, size: usize) -> ReadExactly { pub fn readexactly(&self, size: usize) -> ReadExactly {
@ -135,20 +103,14 @@ impl Payload {
pub fn set_buffer_size(&self, size: usize) { pub fn set_buffer_size(&self, size: usize) {
self.inner.borrow_mut().set_buffer_size(size) self.inner.borrow_mut().set_buffer_size(size)
} }
/// Convert payload into compatible `HttpResponse` body stream
#[inline]
pub fn stream(self) -> BodyStream {
Box::new(self.map(|i| i.0).map_err(|e| e.into()))
}
} }
impl Stream for Payload { impl Stream for Payload {
type Item = PayloadItem; type Item = Bytes;
type Error = PayloadError; type Error = PayloadError;
#[inline] #[inline]
fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> { fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.inner.borrow_mut().readany(false) self.inner.borrow_mut().readany(false)
} }
} }
@ -159,22 +121,6 @@ impl Clone for Payload {
} }
} }
/// Get first available chunk of data
pub struct ReadAny(Rc<RefCell<Inner>>);
impl Stream for ReadAny {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, Self::Error> {
match self.0.borrow_mut().readany(false)? {
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Get exact number of bytes /// Get exact number of bytes
pub struct ReadExactly(Rc<RefCell<Inner>>, usize); pub struct ReadExactly(Rc<RefCell<Inner>>, usize);
@ -325,10 +271,10 @@ impl Inner {
self.len self.len
} }
fn readany(&mut self, notify: bool) -> Poll<Option<PayloadItem>, PayloadError> { fn readany(&mut self, notify: bool) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() { if let Some(data) = self.items.pop_front() {
self.len -= data.len(); self.len -= data.len();
Ok(Async::Ready(Some(PayloadItem(data)))) Ok(Async::Ready(Some(data)))
} else if let Some(err) = self.err.take() { } else if let Some(err) = self.err.take() {
Err(err) Err(err)
} else if self.eof { } else if self.eof {
@ -486,12 +432,12 @@ mod tests {
#[test] #[test]
fn test_basic() { fn test_basic() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (_, payload) = Payload::new(false); let (_, mut payload) = Payload::new(false);
assert!(!payload.eof()); assert!(!payload.eof());
assert!(payload.is_empty()); assert!(payload.is_empty());
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -501,9 +447,9 @@ mod tests {
#[test] #[test]
fn test_eof() { fn test_eof() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.poll().ok().unwrap());
assert!(!payload.eof()); assert!(!payload.eof());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
@ -512,12 +458,12 @@ mod tests {
assert!(!payload.eof()); assert!(!payload.eof());
assert_eq!(Async::Ready(Some(Bytes::from("data"))), assert_eq!(Async::Ready(Some(Bytes::from("data"))),
payload.readany().poll().ok().unwrap()); payload.poll().ok().unwrap());
assert!(payload.is_empty()); assert!(payload.is_empty());
assert!(payload.eof()); assert!(payload.eof());
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap()); assert_eq!(Async::Ready(None), payload.poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -526,12 +472,12 @@ mod tests {
#[test] #[test]
fn test_err() { fn test_err() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert_eq!(Async::NotReady, payload.poll().ok().unwrap());
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readany().poll().err().unwrap(); payload.poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -540,7 +486,7 @@ mod tests {
#[test] #[test]
fn test_readany() { fn test_readany() {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
@ -552,7 +498,7 @@ mod tests {
assert_eq!(payload.len(), 10); assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Some(Bytes::from("line1"))), assert_eq!(Async::Ready(Some(Bytes::from("line1"))),
payload.readany().poll().ok().unwrap()); payload.poll().ok().unwrap());
assert!(!payload.is_empty()); assert!(!payload.is_empty());
assert_eq!(payload.len(), 5); assert_eq!(payload.len(), 5);
@ -625,7 +571,7 @@ mod tests {
assert_eq!(payload.len(), 4); assert_eq!(payload.len(), 4);
assert_eq!(Async::Ready(Some(Bytes::from("data"))), assert_eq!(Async::Ready(Some(Bytes::from("data"))),
payload.readany().poll().ok().unwrap()); payload.poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)

View File

@ -50,7 +50,7 @@ use futures::{Async, Poll, Stream};
use actix::{Actor, AsyncContext, Handler}; use actix::{Actor, AsyncContext, Handler};
use body::Binary; use body::Binary;
use payload::ReadAny; use payload::Payload;
use error::{Error, WsHandshakeError}; use error::{Error, WsHandshakeError};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
@ -86,12 +86,12 @@ pub enum Message {
} }
/// Do websocket handshake and start actor /// Do websocket handshake and start actor
pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<HttpResponse, Error> pub fn start<A, S>(req: HttpRequest<S>, actor: A) -> Result<HttpResponse, Error>
where A: Actor<Context=WebsocketContext<A, S>> + Handler<Message>, where A: Actor<Context=WebsocketContext<A, S>> + Handler<Message>,
S: 'static S: 'static
{ {
let mut resp = handshake(&req)?; let mut resp = handshake(&req)?;
let stream = WsStream::new(req.payload_mut().readany()); let stream = WsStream::new(req.payload().clone());
let mut ctx = WebsocketContext::new(req, actor); let mut ctx = WebsocketContext::new(req, actor);
ctx.add_message_stream(stream); ctx.add_message_stream(stream);
@ -166,14 +166,14 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponseBuilder, WsHands
/// Maps `Payload` stream into stream of `ws::Message` items /// Maps `Payload` stream into stream of `ws::Message` items
pub struct WsStream { pub struct WsStream {
rx: ReadAny, rx: Payload,
buf: BytesMut, buf: BytesMut,
closed: bool, closed: bool,
error_sent: bool, error_sent: bool,
} }
impl WsStream { impl WsStream {
pub fn new(payload: ReadAny) -> WsStream { pub fn new(payload: Payload) -> WsStream {
WsStream { rx: payload, WsStream { rx: payload,
buf: BytesMut::new(), buf: BytesMut::new(),
closed: false, closed: false,