From f3b853f2249a89f2e69385059fea4bc58ab8d342 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 19 Dec 2017 00:18:57 -0800 Subject: [PATCH] refactor payload --- examples/basic.rs | 9 ++- guide/src/qs_7.md | 43 +++++++++++++- src/error.rs | 11 ---- src/h1.rs | 49 +++++++-------- src/httprequest.rs | 38 +++++------- src/lib.rs | 4 +- src/multipart.rs | 26 ++++---- src/payload.rs | 145 +++++++++++++++++++++++++++++++++------------ src/ws.rs | 24 ++++---- 9 files changed, 213 insertions(+), 136 deletions(-) diff --git a/examples/basic.rs b/examples/basic.rs index d6b8b3a9e..4ceef9e13 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -5,6 +5,7 @@ extern crate actix; extern crate actix_web; extern crate env_logger; extern crate futures; +use futures::Stream; use actix_web::*; use actix_web::middlewares::RequestSession; @@ -13,11 +14,9 @@ use futures::future::{FutureResult, result}; /// simple handler fn index(mut req: HttpRequest) -> Result { println!("{:?}", req); - if let Some(payload) = req.payload_mut() { - if let Ok(ch) = payload.readany() { - if let futures::Async::Ready(Some(d)) = ch { - println!("{}", String::from_utf8_lossy(d.0.as_ref())); - } + if let Ok(ch) = req.payload_mut().readany().poll() { + if let futures::Async::Ready(Some(d)) = ch { + println!("{}", String::from_utf8_lossy(d.as_ref())); } } diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index 7e8bd08a9..e3248068d 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -99,12 +99,11 @@ Enabling chunked encoding for *HTTP/2.0* responses is forbidden. ```rust # extern crate actix_web; use actix_web::*; -use actix_web::headers::ContentEncoding; fn index(req: HttpRequest) -> HttpResponse { HttpResponse::Ok() .chunked() - .body(Body::Streaming(Payload::empty().stream())).unwrap() + .body(Body::Streaming(payload::Payload::empty().stream())).unwrap() } # fn main() {} ``` @@ -123,4 +122,42 @@ fn index(req: HttpRequest) -> HttpResponse { ## Streaming request -[WIP] +Actix uses [*Payload*](../actix_web/struct.Payload.html) object as request payload stream. +*HttpRequest* provides several methods, which can be used for payload access. +At the same time *Payload* implements *Stream* trait, so it could be used with various +stream combinators. Also *Payload* provides serveral convinience methods that return +future object that resolve to Bytes object. + +* *readany* method returns *Stream* of *Bytes* objects. + +* *readexactly* method returns *Future* that resolves when specified number of bytes + get received. + +* *readline* method returns *Future* that resolves when `\n` get received. + +* *readuntil* method returns *Future* that resolves when specified bytes string + matches in input bytes stream + +Here is example that reads request payload and prints it. + +```rust +# extern crate actix_web; +# extern crate futures; +# use futures::future::result; +use actix_web::*; +use futures::{Future, Stream}; + + +fn index(mut req: HttpRequest) -> Box> { + Box::new( + req.payload_mut() + .readany() + .fold((), |_, chunk| { + println!("Chunk: {:?}", chunk); + result::<_, error::PayloadError>(Ok(())) + }) + .map_err(|e| Error::from(e)) + .map(|_| HttpResponse::Ok().finish().unwrap())) +} +# fn main() {} +``` diff --git a/src/error.rs b/src/error.rs index a44863ff5..98f26e4cb 100644 --- a/src/error.rs +++ b/src/error.rs @@ -265,9 +265,6 @@ pub enum MultipartError { /// Multipart boundary is not found #[fail(display="Multipart boundary is not found")] Boundary, - /// Request does not contain payload - #[fail(display="Request does not contain payload")] - NoPayload, /// Error during field parsing #[fail(display="{}", _0)] Parse(#[cause] ParseError), @@ -335,9 +332,6 @@ pub enum WsHandshakeError { /// Websocket key is not set or wrong #[fail(display="Unknown websocket key")] BadWebsocketKey, - /// Request does not contain payload - #[fail(display="Request does not contain payload")] - NoPayload, } impl ResponseError for WsHandshakeError { @@ -361,8 +355,6 @@ impl ResponseError for WsHandshakeError { HTTPBadRequest.with_reason("Unsupported version"), WsHandshakeError::BadWebsocketKey => HTTPBadRequest.with_reason("Handshake error"), - WsHandshakeError::NoPayload => - HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR, Body::Empty), } } } @@ -382,9 +374,6 @@ pub enum UrlencodedError { /// Content type error #[fail(display="Content type error")] ContentType, - /// Request does not contain payload - #[fail(display="Request does not contain payload")] - NoPayload, } /// Return `BadRequest` for `UrlencodedError` diff --git a/src/h1.rs b/src/h1.rs index 78e060e1e..bfd187e19 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -1002,7 +1002,6 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert!(req.payload().is_none()); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1026,7 +1025,6 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::PUT); assert_eq!(req.path(), "/test"); - assert!(req.payload().is_none()); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1044,7 +1042,6 @@ mod tests { assert_eq!(req.version(), Version::HTTP_10); assert_eq!(*req.method(), Method::POST); assert_eq!(req.path(), "/test2"); - assert!(req.payload().is_none()); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1062,7 +1059,7 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"body"); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body"); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1081,7 +1078,7 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"body"); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body"); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1102,7 +1099,6 @@ mod tests { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); - assert!(req.payload().is_none()); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1130,7 +1126,6 @@ mod tests { assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); - assert!(req.payload().is_none()); } Ok(_) | Err(_) => panic!("Error during parsing http request"), } @@ -1240,7 +1235,7 @@ mod tests { connection: upgrade\r\n\r\n"); let req = parse_ready!(&mut buf); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); assert!(req.upgrade()); } @@ -1252,7 +1247,7 @@ mod tests { let req = parse_ready!(&mut buf); assert!(req.upgrade()); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); } #[test] @@ -1262,7 +1257,6 @@ mod tests { transfer-encoding: chunked\r\n\r\n"); let req = parse_ready!(&mut buf); - assert!(req.payload().is_some()); if let Ok(val) = req.chunked() { assert!(val); } else { @@ -1274,7 +1268,6 @@ mod tests { transfer-encoding: chnked\r\n\r\n"); let req = parse_ready!(&mut buf); - assert!(req.payload().is_none()); if let Ok(val) = req.chunked() { assert!(!val); } else { @@ -1334,7 +1327,7 @@ mod tests { let mut req = parse_ready!(&mut buf); assert!(!req.keep_alive()); assert!(req.upgrade()); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"some raw data"); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"some raw data"); } #[test] @@ -1383,13 +1376,13 @@ mod tests { let mut reader = Reader::new(); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(req.chunked().unwrap()); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(!req.payload().unwrap().eof()); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); - assert!(req.payload().unwrap().eof()); + assert!(!req.payload().eof()); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); + assert!(req.payload().eof()); } #[test] @@ -1404,7 +1397,7 @@ mod tests { let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(req.chunked().unwrap()); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); buf.feed_data( "4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ @@ -1414,10 +1407,10 @@ mod tests { let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert_eq!(*req2.method(), Method::POST); assert!(req2.chunked().unwrap()); - assert!(!req2.payload().unwrap().eof()); + assert!(!req2.payload().eof()); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); - assert!(req.payload().unwrap().eof()); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); + assert!(req.payload().eof()); } #[test] @@ -1431,7 +1424,7 @@ mod tests { let mut reader = Reader::new(); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(req.chunked().unwrap()); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); buf.feed_data("4\r\ndata\r"); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); @@ -1453,12 +1446,12 @@ mod tests { //buf.feed_data("test: test\r\n"); //not_ready!(reader.parse(&mut buf, &mut readbuf)); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); - assert!(!req.payload().unwrap().eof()); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); + assert!(!req.payload().eof()); buf.feed_data("\r\n"); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(req.payload().unwrap().eof()); + assert!(req.payload().eof()); } #[test] @@ -1472,13 +1465,13 @@ mod tests { let mut reader = Reader::new(); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); assert!(req.chunked().unwrap()); - assert!(!req.payload().unwrap().eof()); + assert!(!req.payload().eof()); buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); - assert!(!req.payload().unwrap().eof()); - assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); - assert!(req.payload().unwrap().eof()); + assert!(!req.payload().eof()); + assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline"); + assert!(req.payload().eof()); } /*#[test] diff --git a/src/httprequest.rs b/src/httprequest.rs index 0fab3c342..f22fea54d 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -79,8 +79,8 @@ impl HttpMessage { self.params.clear(); self.cookies = None; self.addr = None; - self.payload = None; self.info = None; + self.payload = None; } } @@ -385,32 +385,30 @@ impl HttpRequest { /// Returns reference to the associated http payload. #[inline] - pub fn payload(&self) -> Option<&Payload> { - self.as_ref().payload.as_ref() + pub fn payload(&self) -> &Payload { + let msg = self.as_mut(); + if msg.payload.is_none() { + msg.payload = Some(Payload::empty()); + } + msg.payload.as_ref().unwrap() } /// Returns mutable reference to the associated http payload. #[inline] - pub fn payload_mut(&mut self) -> Option<&mut Payload> { - self.as_mut().payload.as_mut() + pub fn payload_mut(&mut self) -> &mut Payload { + let msg = self.as_mut(); + if msg.payload.is_none() { + msg.payload = Some(Payload::empty()); + } + msg.payload.as_mut().unwrap() } - /// Return payload - #[inline] - pub fn take_payload(&mut self) -> Option { - self.as_mut().payload.take() - } - /// Return stream to process BODY as multipart. /// /// Content-type: multipart/form-data; pub fn multipart(&mut self) -> Result { let boundary = Multipart::boundary(self.headers())?; - if let Some(payload) = self.take_payload() { - Ok(Multipart::new(boundary, payload)) - } else { - Err(MultipartError::NoPayload) - } + Ok(Multipart::new(boundary, self.payload().clone())) } /// Parse `application/x-www-form-urlencoded` encoded body. @@ -453,11 +451,7 @@ impl HttpRequest { }; if t { - if let Some(payload) = self.take_payload() { - Ok(UrlEncoded{pl: payload, body: BytesMut::new()}) - } else { - Err(UrlencodedError::NoPayload) - } + Ok(UrlEncoded{pl: self.payload().clone(), body: BytesMut::new()}) } else { Err(UrlencodedError::ContentType) } @@ -523,7 +517,7 @@ impl Future for UrlEncoded { Ok(Async::Ready(m)) }, Ok(Async::Ready(Some(item))) => { - self.body.extend_from_slice(&item.0); + self.body.extend_from_slice(&item); continue }, Err(err) => Err(err), diff --git a/src/lib.rs b/src/lib.rs index 202da08a6..9a83907a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -93,7 +93,6 @@ mod helpers; mod encoding; mod httprequest; mod httpresponse; -mod payload; mod info; mod route; mod router; @@ -117,12 +116,12 @@ pub mod httpcodes; pub mod multipart; pub mod middlewares; pub mod pred; +pub mod payload; pub use error::{Error, Result, ResponseError}; pub use body::{Body, Binary}; pub use application::Application; pub use httprequest::HttpRequest; pub use httpresponse::HttpResponse; -pub use payload::{Payload, PayloadItem}; pub use handler::{Reply, Responder, Json, NormalizePath}; pub use route::Route; pub use resource::Resource; @@ -167,7 +166,6 @@ pub mod dev { pub use handler::Handler; pub use router::{Router, Pattern}; pub use pipeline::Pipeline; - pub use payload::{PayloadSender, PayloadWriter}; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use param::{FromParam, Params}; pub use server::ServerSettings; diff --git a/src/multipart.rs b/src/multipart.rs index f09c135fd..59ba232e7 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -9,7 +9,7 @@ use httparse; use bytes::Bytes; use http::HttpTryFrom; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; -use futures::{Async, Stream, Poll}; +use futures::{Async, Future, Stream, Poll}; use futures::task::{Task, current as current_task}; use error::{ParseError, PayloadError, MultipartError}; @@ -119,7 +119,7 @@ impl InnerMultipart { fn read_headers(payload: &mut Payload) -> Poll { - match payload.readuntil(b"\r\n\r\n")? { + match payload.readuntil(b"\r\n\r\n").poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(bytes) => { let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; @@ -150,7 +150,7 @@ impl InnerMultipart { fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll { // TODO: need to read epilogue - match payload.readline()? { + match payload.readline().poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(chunk) => { if chunk.len() == boundary.len() + 4 && @@ -175,7 +175,7 @@ impl InnerMultipart { { let mut eof = false; loop { - if let Async::Ready(chunk) = payload.readline()? { + if let Async::Ready(chunk) = payload.readline().poll()? { if chunk.is_empty() { //ValueError("Could not find starting boundary %r" //% (self._boundary)) @@ -452,15 +452,15 @@ impl InnerField { if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.readany() { + match payload.readany().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(Some(mut chunk))) => { - let len = cmp::min(chunk.0.len() as u64, *size); + let len = cmp::min(chunk.len() as u64, *size); *size -= len; - let ch = chunk.0.split_to(len as usize); - if !chunk.0.is_empty() { - payload.unread_data(chunk.0); + let ch = chunk.split_to(len as usize); + if !chunk.is_empty() { + payload.unread_data(chunk); } Ok(Async::Ready(Some(ch))) }, @@ -473,12 +473,12 @@ impl InnerField { /// The `Content-Length` header for body part is not necessary. fn read_stream(payload: &mut Payload, boundary: &str) -> Poll, MultipartError> { - match payload.readuntil(b"\r")? { + match payload.readuntil(b"\r").poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(mut chunk) => { if chunk.len() == 1 { payload.unread_data(chunk); - match payload.readexactly(boundary.len() + 4)? { + match payload.readexactly(boundary.len() + 4).poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(chunk) => { if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && @@ -507,7 +507,7 @@ impl InnerField { } if self.eof { if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { - match payload.readline()? { + match payload.readline().poll()? { Async::NotReady => return Ok(Async::NotReady), Async::Ready(chunk) => { @@ -536,7 +536,7 @@ impl InnerField { Async::Ready(Some(bytes)) => Async::Ready(Some(FieldChunk(bytes))), Async::Ready(None) => { self.eof = true; - match payload.readline()? { + match payload.readline().poll()? { Async::NotReady => Async::NotReady, Async::Ready(chunk) => { assert_eq!( diff --git a/src/payload.rs b/src/payload.rs index 3d0690da8..57dd7cf03 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,10 +1,11 @@ +//! Payload stream use std::{fmt, cmp}; use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::VecDeque; use std::ops::{Deref, DerefMut}; use bytes::{Bytes, BytesMut}; -use futures::{Async, Poll, Stream}; +use futures::{Future, Async, Poll, Stream}; use futures::task::{Task, current as current_task}; use body::BodyStream; @@ -88,27 +89,23 @@ impl Payload { } /// Get first available chunk of data. - /// Returns Some(PayloadItem) as chunk, `None` indicates eof. - pub fn readany(&mut self) -> Poll, PayloadError> { - self.inner.borrow_mut().readany() + pub fn readany(&mut self) -> ReadAny { + ReadAny(Rc::clone(&self.inner)) } - /// Get exactly number of bytes - /// Returns Some(PayloadItem) as chunk, `None` indicates eof. - pub fn readexactly(&mut self, size: usize) -> Result, PayloadError> { - self.inner.borrow_mut().readexactly(size) + /// Get exact number of bytes + pub fn readexactly(&mut self, size: usize) -> ReadExactly { + ReadExactly(Rc::clone(&self.inner), size) } /// Read until `\n` - /// Returns Some(PayloadItem) as line, `None` indicates eof. - pub fn readline(&mut self) -> Result, PayloadError> { - self.inner.borrow_mut().readline() + pub fn readline(&mut self) -> ReadLine { + ReadLine(Rc::clone(&self.inner)) } /// Read until match line - /// Returns Some(PayloadItem) as line, `None` indicates eof. - pub fn readuntil(&mut self, line: &[u8]) -> Result, PayloadError> { - self.inner.borrow_mut().readuntil(line) + pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil { + ReadUntil(Rc::clone(&self.inner), line.to_vec()) } #[doc(hidden)] @@ -133,19 +130,91 @@ impl Payload { /// Convert payload into BodyStream pub fn stream(self) -> BodyStream { - Box::new(self.map(|item| item.0).map_err(|e| e.into())) + Box::new(self.map_err(|e| e.into())) } } impl Stream for Payload { - type Item = PayloadItem; + type Item = Bytes; type Error = PayloadError; - fn poll(&mut self) -> Poll, PayloadError> { - self.readany() + fn poll(&mut self) -> Poll, PayloadError> { + match self.inner.borrow_mut().readany()? { + Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } } } +impl Clone for Payload { + fn clone(&self) -> Payload { + Payload{inner: Rc::clone(&self.inner)} + } +} + +/// Get first available chunk of data +pub struct ReadAny(Rc>); + +impl Stream for ReadAny { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.0.borrow_mut().readany()? { + Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Get exact number of bytes +pub struct ReadExactly(Rc>, usize); + +impl Future for ReadExactly { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll { + match self.0.borrow_mut().readexactly(self.1)? { + Async::Ready(chunk) => Ok(Async::Ready(chunk)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Read until `\n` +pub struct ReadLine(Rc>); + +impl Future for ReadLine { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll { + match self.0.borrow_mut().readline()? { + Async::Ready(chunk) => Ok(Async::Ready(chunk)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Read until match line +pub struct ReadUntil(Rc>, Vec); + +impl Future for ReadUntil { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll { + match self.0.borrow_mut().readuntil(&self.1)? { + Async::Ready(chunk) => Ok(Async::Ready(chunk)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Payload writer interface. pub trait PayloadWriter { /// Set stream error. @@ -408,7 +477,7 @@ mod tests { assert!(!payload.eof()); assert!(payload.is_empty()); assert_eq!(payload.len(), 0); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) @@ -420,7 +489,7 @@ mod tests { Core::new().unwrap().run(lazy(|| { let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); assert!(!payload.eof()); sender.feed_data(Bytes::from("data")); @@ -428,13 +497,13 @@ mod tests { assert!(!payload.eof()); - assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("data")))), - payload.readany().ok().unwrap()); + assert_eq!(Async::Ready(Some(Bytes::from("data"))), + payload.readany().poll().ok().unwrap()); assert!(payload.is_empty()); assert!(payload.eof()); assert_eq!(payload.len(), 0); - assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); + assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -445,10 +514,10 @@ mod tests { Core::new().unwrap().run(lazy(|| { let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); + assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap()); sender.set_error(PayloadError::Incomplete); - payload.readany().err().unwrap(); + payload.readany().poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) })).unwrap(); @@ -468,8 +537,8 @@ mod tests { assert!(!payload.is_empty()); assert_eq!(payload.len(), 10); - assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("line1")))), - payload.readany().ok().unwrap()); + assert_eq!(Async::Ready(Some(Bytes::from("line1"))), + payload.readany().poll().ok().unwrap()); assert!(!payload.is_empty()); assert_eq!(payload.len(), 5); @@ -483,20 +552,22 @@ mod tests { Core::new().unwrap().run(lazy(|| { let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap()); + assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); assert_eq!(payload.len(), 10); - assert_eq!(Async::Ready(Bytes::from("li")), payload.readexactly(2).ok().unwrap()); + assert_eq!(Async::Ready(Bytes::from("li")), + payload.readexactly(2).poll().ok().unwrap()); assert_eq!(payload.len(), 8); - assert_eq!(Async::Ready(Bytes::from("ne1l")), payload.readexactly(4).ok().unwrap()); + assert_eq!(Async::Ready(Bytes::from("ne1l")), + payload.readexactly(4).poll().ok().unwrap()); assert_eq!(payload.len(), 4); sender.set_error(PayloadError::Incomplete); - payload.readexactly(10).err().unwrap(); + payload.readexactly(10).poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) @@ -508,22 +579,22 @@ mod tests { Core::new().unwrap().run(lazy(|| { let (mut sender, mut payload) = Payload::new(false); - assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap()); + assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); assert_eq!(payload.len(), 10); assert_eq!(Async::Ready(Bytes::from("line")), - payload.readuntil(b"ne").ok().unwrap()); + payload.readuntil(b"ne").poll().ok().unwrap()); assert_eq!(payload.len(), 6); assert_eq!(Async::Ready(Bytes::from("1line2")), - payload.readuntil(b"2").ok().unwrap()); + payload.readuntil(b"2").poll().ok().unwrap()); assert_eq!(payload.len(), 0); sender.set_error(PayloadError::Incomplete); - payload.readuntil(b"b").err().unwrap(); + payload.readuntil(b"b").poll().err().unwrap(); let res: Result<(), ()> = Ok(()); result(res) @@ -539,8 +610,8 @@ mod tests { assert!(!payload.is_empty()); assert_eq!(payload.len(), 4); - assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("data")))), - payload.readany().ok().unwrap()); + assert_eq!(Async::Ready(Some(Bytes::from("data"))), + payload.readany().poll().ok().unwrap()); let res: Result<(), ()> = Ok(()); result(res) diff --git a/src/ws.rs b/src/ws.rs index 324a304af..097ec7997 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -57,7 +57,7 @@ use actix::{Actor, AsyncContext, ResponseType, StreamHandler}; use body::Body; use context::HttpContext; use handler::Reply; -use payload::Payload; +use payload::ReadAny; use error::{Error, WsHandshakeError}; use httprequest::HttpRequest; use httpresponse::{ConnectionType, HttpResponse}; @@ -96,15 +96,11 @@ pub fn start(mut req: HttpRequest, actor: A) -> Result { let resp = handshake(&req)?; - if let Some(payload) = req.take_payload() { - let stream = WsStream::new(payload); - let mut ctx = HttpContext::new(req, actor); - ctx.start(resp); - ctx.add_stream(stream); - Ok(ctx.into()) - } else { - Err(WsHandshakeError::NoPayload.into()) - } + let stream = WsStream::new(req.payload_mut().readany()); + let mut ctx = HttpContext::new(req, actor); + ctx.start(resp); + ctx.add_stream(stream); + Ok(ctx.into()) } /// Prepare `WebSocket` handshake response. @@ -175,14 +171,14 @@ pub fn handshake(req: &HttpRequest) -> Result WsStream { + pub fn new(payload: ReadAny) -> WsStream { WsStream { rx: payload, buf: BytesMut::new(), closed: false, @@ -199,9 +195,9 @@ impl Stream for WsStream { if !self.closed { loop { - match self.rx.readany() { + match self.rx.poll() { Ok(Async::Ready(Some(chunk))) => { - self.buf.extend_from_slice(&chunk.0) + self.buf.extend_from_slice(&chunk) } Ok(Async::Ready(None)) => { done = true;