1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-23 07:14:35 +01:00

refactor payload

This commit is contained in:
Nikolay Kim 2017-12-19 00:18:57 -08:00
parent 0cab873066
commit f3b853f224
9 changed files with 213 additions and 136 deletions

View File

@ -5,6 +5,7 @@ extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
use futures::Stream;
use actix_web::*; use actix_web::*;
use actix_web::middlewares::RequestSession; use actix_web::middlewares::RequestSession;
@ -13,11 +14,9 @@ use futures::future::{FutureResult, result};
/// simple handler /// simple handler
fn index(mut req: HttpRequest) -> Result<HttpResponse> { fn index(mut req: HttpRequest) -> Result<HttpResponse> {
println!("{:?}", req); println!("{:?}", req);
if let Some(payload) = req.payload_mut() { if let Ok(ch) = req.payload_mut().readany().poll() {
if let Ok(ch) = payload.readany() { if let futures::Async::Ready(Some(d)) = ch {
if let futures::Async::Ready(Some(d)) = ch { println!("{}", String::from_utf8_lossy(d.as_ref()));
println!("{}", String::from_utf8_lossy(d.0.as_ref()));
}
} }
} }

View File

@ -99,12 +99,11 @@ Enabling chunked encoding for *HTTP/2.0* responses is forbidden.
```rust ```rust
# extern crate actix_web; # extern crate actix_web;
use actix_web::*; use actix_web::*;
use actix_web::headers::ContentEncoding;
fn index(req: HttpRequest) -> HttpResponse { fn index(req: HttpRequest) -> HttpResponse {
HttpResponse::Ok() HttpResponse::Ok()
.chunked() .chunked()
.body(Body::Streaming(Payload::empty().stream())).unwrap() .body(Body::Streaming(payload::Payload::empty().stream())).unwrap()
} }
# fn main() {} # fn main() {}
``` ```
@ -123,4 +122,42 @@ fn index(req: HttpRequest) -> HttpResponse {
## Streaming request ## Streaming request
[WIP] Actix uses [*Payload*](../actix_web/struct.Payload.html) object as request payload stream.
*HttpRequest* provides several methods, which can be used for payload access.
At the same time *Payload* implements *Stream* trait, so it could be used with various
stream combinators. Also *Payload* provides serveral convinience methods that return
future object that resolve to Bytes object.
* *readany* method returns *Stream* of *Bytes* objects.
* *readexactly* method returns *Future* that resolves when specified number of bytes
get received.
* *readline* method returns *Future* that resolves when `\n` get received.
* *readuntil* method returns *Future* that resolves when specified bytes string
matches in input bytes stream
Here is example that reads request payload and prints it.
```rust
# extern crate actix_web;
# extern crate futures;
# use futures::future::result;
use actix_web::*;
use futures::{Future, Stream};
fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>> {
Box::new(
req.payload_mut()
.readany()
.fold((), |_, chunk| {
println!("Chunk: {:?}", chunk);
result::<_, error::PayloadError>(Ok(()))
})
.map_err(|e| Error::from(e))
.map(|_| HttpResponse::Ok().finish().unwrap()))
}
# fn main() {}
```

View File

@ -265,9 +265,6 @@ pub enum MultipartError {
/// Multipart boundary is not found /// Multipart boundary is not found
#[fail(display="Multipart boundary is not found")] #[fail(display="Multipart boundary is not found")]
Boundary, Boundary,
/// Request does not contain payload
#[fail(display="Request does not contain payload")]
NoPayload,
/// Error during field parsing /// Error during field parsing
#[fail(display="{}", _0)] #[fail(display="{}", _0)]
Parse(#[cause] ParseError), Parse(#[cause] ParseError),
@ -335,9 +332,6 @@ pub enum WsHandshakeError {
/// Websocket key is not set or wrong /// Websocket key is not set or wrong
#[fail(display="Unknown websocket key")] #[fail(display="Unknown websocket key")]
BadWebsocketKey, BadWebsocketKey,
/// Request does not contain payload
#[fail(display="Request does not contain payload")]
NoPayload,
} }
impl ResponseError for WsHandshakeError { impl ResponseError for WsHandshakeError {
@ -361,8 +355,6 @@ impl ResponseError for WsHandshakeError {
HTTPBadRequest.with_reason("Unsupported version"), HTTPBadRequest.with_reason("Unsupported version"),
WsHandshakeError::BadWebsocketKey => WsHandshakeError::BadWebsocketKey =>
HTTPBadRequest.with_reason("Handshake error"), HTTPBadRequest.with_reason("Handshake error"),
WsHandshakeError::NoPayload =>
HttpResponse::new(StatusCode::INTERNAL_SERVER_ERROR, Body::Empty),
} }
} }
} }
@ -382,9 +374,6 @@ pub enum UrlencodedError {
/// Content type error /// Content type error
#[fail(display="Content type error")] #[fail(display="Content type error")]
ContentType, ContentType,
/// Request does not contain payload
#[fail(display="Request does not contain payload")]
NoPayload,
} }
/// Return `BadRequest` for `UrlencodedError` /// Return `BadRequest` for `UrlencodedError`

View File

@ -1002,7 +1002,6 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert!(req.payload().is_none());
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1026,7 +1025,6 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::PUT); assert_eq!(*req.method(), Method::PUT);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert!(req.payload().is_none());
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1044,7 +1042,6 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_10); assert_eq!(req.version(), Version::HTTP_10);
assert_eq!(*req.method(), Method::POST); assert_eq!(*req.method(), Method::POST);
assert_eq!(req.path(), "/test2"); assert_eq!(req.path(), "/test2");
assert!(req.payload().is_none());
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1062,7 +1059,7 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"body"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body");
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1081,7 +1078,7 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"body"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"body");
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1102,7 +1099,6 @@ mod tests {
assert_eq!(req.version(), Version::HTTP_11); assert_eq!(req.version(), Version::HTTP_11);
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert!(req.payload().is_none());
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1130,7 +1126,6 @@ mod tests {
assert_eq!(*req.method(), Method::GET); assert_eq!(*req.method(), Method::GET);
assert_eq!(req.path(), "/test"); assert_eq!(req.path(), "/test");
assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value");
assert!(req.payload().is_none());
} }
Ok(_) | Err(_) => panic!("Error during parsing http request"), Ok(_) | Err(_) => panic!("Error during parsing http request"),
} }
@ -1240,7 +1235,7 @@ mod tests {
connection: upgrade\r\n\r\n"); connection: upgrade\r\n\r\n");
let req = parse_ready!(&mut buf); let req = parse_ready!(&mut buf);
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
assert!(req.upgrade()); assert!(req.upgrade());
} }
@ -1252,7 +1247,7 @@ mod tests {
let req = parse_ready!(&mut buf); let req = parse_ready!(&mut buf);
assert!(req.upgrade()); assert!(req.upgrade());
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
} }
#[test] #[test]
@ -1262,7 +1257,6 @@ mod tests {
transfer-encoding: chunked\r\n\r\n"); transfer-encoding: chunked\r\n\r\n");
let req = parse_ready!(&mut buf); let req = parse_ready!(&mut buf);
assert!(req.payload().is_some());
if let Ok(val) = req.chunked() { if let Ok(val) = req.chunked() {
assert!(val); assert!(val);
} else { } else {
@ -1274,7 +1268,6 @@ mod tests {
transfer-encoding: chnked\r\n\r\n"); transfer-encoding: chnked\r\n\r\n");
let req = parse_ready!(&mut buf); let req = parse_ready!(&mut buf);
assert!(req.payload().is_none());
if let Ok(val) = req.chunked() { if let Ok(val) = req.chunked() {
assert!(!val); assert!(!val);
} else { } else {
@ -1334,7 +1327,7 @@ mod tests {
let mut req = parse_ready!(&mut buf); let mut req = parse_ready!(&mut buf);
assert!(!req.keep_alive()); assert!(!req.keep_alive());
assert!(req.upgrade()); assert!(req.upgrade());
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"some raw data"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"some raw data");
} }
#[test] #[test]
@ -1383,13 +1376,13 @@ mod tests {
let mut reader = Reader::new(); let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.chunked().unwrap()); assert!(req.chunked().unwrap());
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); buf.feed_data("4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n");
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
assert!(req.payload().unwrap().eof()); assert!(req.payload().eof());
} }
#[test] #[test]
@ -1404,7 +1397,7 @@ mod tests {
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.chunked().unwrap()); assert!(req.chunked().unwrap());
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
buf.feed_data( buf.feed_data(
"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\ "4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n\
@ -1414,10 +1407,10 @@ mod tests {
let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let req2 = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert_eq!(*req2.method(), Method::POST); assert_eq!(*req2.method(), Method::POST);
assert!(req2.chunked().unwrap()); assert!(req2.chunked().unwrap());
assert!(!req2.payload().unwrap().eof()); assert!(!req2.payload().eof());
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
assert!(req.payload().unwrap().eof()); assert!(req.payload().eof());
} }
#[test] #[test]
@ -1431,7 +1424,7 @@ mod tests {
let mut reader = Reader::new(); let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.chunked().unwrap()); assert!(req.chunked().unwrap());
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
buf.feed_data("4\r\ndata\r"); buf.feed_data("4\r\ndata\r");
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
@ -1453,12 +1446,12 @@ mod tests {
//buf.feed_data("test: test\r\n"); //buf.feed_data("test: test\r\n");
//not_ready!(reader.parse(&mut buf, &mut readbuf)); //not_ready!(reader.parse(&mut buf, &mut readbuf));
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
buf.feed_data("\r\n"); buf.feed_data("\r\n");
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.payload().unwrap().eof()); assert!(req.payload().eof());
} }
#[test] #[test]
@ -1472,13 +1465,13 @@ mod tests {
let mut reader = Reader::new(); let mut reader = Reader::new();
let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); let mut req = reader_parse_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(req.chunked().unwrap()); assert!(req.chunked().unwrap());
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") buf.feed_data("4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n")
not_ready!(reader.parse(&mut buf, &mut readbuf, &settings)); not_ready!(reader.parse(&mut buf, &mut readbuf, &settings));
assert!(!req.payload().unwrap().eof()); assert!(!req.payload().eof());
assert_eq!(req.payload_mut().unwrap().readall().unwrap().as_ref(), b"dataline"); assert_eq!(req.payload_mut().readall().unwrap().as_ref(), b"dataline");
assert!(req.payload().unwrap().eof()); assert!(req.payload().eof());
} }
/*#[test] /*#[test]

View File

@ -79,8 +79,8 @@ impl HttpMessage {
self.params.clear(); self.params.clear();
self.cookies = None; self.cookies = None;
self.addr = None; self.addr = None;
self.payload = None;
self.info = None; self.info = None;
self.payload = None;
} }
} }
@ -385,32 +385,30 @@ impl<S> HttpRequest<S> {
/// Returns reference to the associated http payload. /// Returns reference to the associated http payload.
#[inline] #[inline]
pub fn payload(&self) -> Option<&Payload> { pub fn payload(&self) -> &Payload {
self.as_ref().payload.as_ref() let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_ref().unwrap()
} }
/// Returns mutable reference to the associated http payload. /// Returns mutable reference to the associated http payload.
#[inline] #[inline]
pub fn payload_mut(&mut self) -> Option<&mut Payload> { pub fn payload_mut(&mut self) -> &mut Payload {
self.as_mut().payload.as_mut() let msg = self.as_mut();
if msg.payload.is_none() {
msg.payload = Some(Payload::empty());
}
msg.payload.as_mut().unwrap()
} }
/// Return payload
#[inline]
pub fn take_payload(&mut self) -> Option<Payload> {
self.as_mut().payload.take()
}
/// Return stream to process BODY as multipart. /// Return stream to process BODY as multipart.
/// ///
/// Content-type: multipart/form-data; /// Content-type: multipart/form-data;
pub fn multipart(&mut self) -> Result<Multipart, MultipartError> { pub fn multipart(&mut self) -> Result<Multipart, MultipartError> {
let boundary = Multipart::boundary(self.headers())?; let boundary = Multipart::boundary(self.headers())?;
if let Some(payload) = self.take_payload() { Ok(Multipart::new(boundary, self.payload().clone()))
Ok(Multipart::new(boundary, payload))
} else {
Err(MultipartError::NoPayload)
}
} }
/// Parse `application/x-www-form-urlencoded` encoded body. /// Parse `application/x-www-form-urlencoded` encoded body.
@ -453,11 +451,7 @@ impl<S> HttpRequest<S> {
}; };
if t { if t {
if let Some(payload) = self.take_payload() { Ok(UrlEncoded{pl: self.payload().clone(), body: BytesMut::new()})
Ok(UrlEncoded{pl: payload, body: BytesMut::new()})
} else {
Err(UrlencodedError::NoPayload)
}
} else { } else {
Err(UrlencodedError::ContentType) Err(UrlencodedError::ContentType)
} }
@ -523,7 +517,7 @@ impl Future for UrlEncoded {
Ok(Async::Ready(m)) Ok(Async::Ready(m))
}, },
Ok(Async::Ready(Some(item))) => { Ok(Async::Ready(Some(item))) => {
self.body.extend_from_slice(&item.0); self.body.extend_from_slice(&item);
continue continue
}, },
Err(err) => Err(err), Err(err) => Err(err),

View File

@ -93,7 +93,6 @@ mod helpers;
mod encoding; mod encoding;
mod httprequest; mod httprequest;
mod httpresponse; mod httpresponse;
mod payload;
mod info; mod info;
mod route; mod route;
mod router; mod router;
@ -117,12 +116,12 @@ pub mod httpcodes;
pub mod multipart; pub mod multipart;
pub mod middlewares; pub mod middlewares;
pub mod pred; pub mod pred;
pub mod payload;
pub use error::{Error, Result, ResponseError}; pub use error::{Error, Result, ResponseError};
pub use body::{Body, Binary}; pub use body::{Body, Binary};
pub use application::Application; pub use application::Application;
pub use httprequest::HttpRequest; pub use httprequest::HttpRequest;
pub use httpresponse::HttpResponse; pub use httpresponse::HttpResponse;
pub use payload::{Payload, PayloadItem};
pub use handler::{Reply, Responder, Json, NormalizePath}; pub use handler::{Reply, Responder, Json, NormalizePath};
pub use route::Route; pub use route::Route;
pub use resource::Resource; pub use resource::Resource;
@ -167,7 +166,6 @@ pub mod dev {
pub use handler::Handler; pub use handler::Handler;
pub use router::{Router, Pattern}; pub use router::{Router, Pattern};
pub use pipeline::Pipeline; pub use pipeline::Pipeline;
pub use payload::{PayloadSender, PayloadWriter};
pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; pub use channel::{HttpChannel, HttpHandler, IntoHttpHandler};
pub use param::{FromParam, Params}; pub use param::{FromParam, Params};
pub use server::ServerSettings; pub use server::ServerSettings;

View File

@ -9,7 +9,7 @@ use httparse;
use bytes::Bytes; use bytes::Bytes;
use http::HttpTryFrom; use http::HttpTryFrom;
use http::header::{self, HeaderMap, HeaderName, HeaderValue}; use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use futures::{Async, Stream, Poll}; use futures::{Async, Future, Stream, Poll};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use error::{ParseError, PayloadError, MultipartError}; use error::{ParseError, PayloadError, MultipartError};
@ -119,7 +119,7 @@ impl InnerMultipart {
fn read_headers(payload: &mut Payload) -> Poll<HeaderMap, MultipartError> fn read_headers(payload: &mut Payload) -> Poll<HeaderMap, MultipartError>
{ {
match payload.readuntil(b"\r\n\r\n")? { match payload.readuntil(b"\r\n\r\n").poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(bytes) => { Async::Ready(bytes) => {
let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS]; let mut hdrs = [httparse::EMPTY_HEADER; MAX_HEADERS];
@ -150,7 +150,7 @@ impl InnerMultipart {
fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError> fn read_boundary(payload: &mut Payload, boundary: &str) -> Poll<bool, MultipartError>
{ {
// TODO: need to read epilogue // TODO: need to read epilogue
match payload.readline()? { match payload.readline().poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(chunk) => { Async::Ready(chunk) => {
if chunk.len() == boundary.len() + 4 && if chunk.len() == boundary.len() + 4 &&
@ -175,7 +175,7 @@ impl InnerMultipart {
{ {
let mut eof = false; let mut eof = false;
loop { loop {
if let Async::Ready(chunk) = payload.readline()? { if let Async::Ready(chunk) = payload.readline().poll()? {
if chunk.is_empty() { if chunk.is_empty() {
//ValueError("Could not find starting boundary %r" //ValueError("Could not find starting boundary %r"
//% (self._boundary)) //% (self._boundary))
@ -452,15 +452,15 @@ impl InnerField {
if *size == 0 { if *size == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
match payload.readany() { match payload.readany().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::Ready(Some(mut chunk))) => { Ok(Async::Ready(Some(mut chunk))) => {
let len = cmp::min(chunk.0.len() as u64, *size); let len = cmp::min(chunk.len() as u64, *size);
*size -= len; *size -= len;
let ch = chunk.0.split_to(len as usize); let ch = chunk.split_to(len as usize);
if !chunk.0.is_empty() { if !chunk.is_empty() {
payload.unread_data(chunk.0); payload.unread_data(chunk);
} }
Ok(Async::Ready(Some(ch))) Ok(Async::Ready(Some(ch)))
}, },
@ -473,12 +473,12 @@ impl InnerField {
/// The `Content-Length` header for body part is not necessary. /// The `Content-Length` header for body part is not necessary.
fn read_stream(payload: &mut Payload, boundary: &str) -> Poll<Option<Bytes>, MultipartError> fn read_stream(payload: &mut Payload, boundary: &str) -> Poll<Option<Bytes>, MultipartError>
{ {
match payload.readuntil(b"\r")? { match payload.readuntil(b"\r").poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(mut chunk) => { Async::Ready(mut chunk) => {
if chunk.len() == 1 { if chunk.len() == 1 {
payload.unread_data(chunk); payload.unread_data(chunk);
match payload.readexactly(boundary.len() + 4)? { match payload.readexactly(boundary.len() + 4).poll()? {
Async::NotReady => Ok(Async::NotReady), Async::NotReady => Ok(Async::NotReady),
Async::Ready(chunk) => { Async::Ready(chunk) => {
if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" && if &chunk[..2] == b"\r\n" && &chunk[2..4] == b"--" &&
@ -507,7 +507,7 @@ impl InnerField {
} }
if self.eof { if self.eof {
if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) {
match payload.readline()? { match payload.readline().poll()? {
Async::NotReady => Async::NotReady =>
return Ok(Async::NotReady), return Ok(Async::NotReady),
Async::Ready(chunk) => { Async::Ready(chunk) => {
@ -536,7 +536,7 @@ impl InnerField {
Async::Ready(Some(bytes)) => Async::Ready(Some(FieldChunk(bytes))), Async::Ready(Some(bytes)) => Async::Ready(Some(FieldChunk(bytes))),
Async::Ready(None) => { Async::Ready(None) => {
self.eof = true; self.eof = true;
match payload.readline()? { match payload.readline().poll()? {
Async::NotReady => Async::NotReady, Async::NotReady => Async::NotReady,
Async::Ready(chunk) => { Async::Ready(chunk) => {
assert_eq!( assert_eq!(

View File

@ -1,10 +1,11 @@
//! Payload stream
use std::{fmt, cmp}; use std::{fmt, cmp};
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Future, Async, Poll, Stream};
use futures::task::{Task, current as current_task}; use futures::task::{Task, current as current_task};
use body::BodyStream; use body::BodyStream;
@ -88,27 +89,23 @@ impl Payload {
} }
/// Get first available chunk of data. /// Get first available chunk of data.
/// Returns Some(PayloadItem) as chunk, `None` indicates eof. pub fn readany(&mut self) -> ReadAny {
pub fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> { ReadAny(Rc::clone(&self.inner))
self.inner.borrow_mut().readany()
} }
/// Get exactly number of bytes /// Get exact number of bytes
/// Returns Some(PayloadItem) as chunk, `None` indicates eof. pub fn readexactly(&mut self, size: usize) -> ReadExactly {
pub fn readexactly(&mut self, size: usize) -> Result<Async<Bytes>, PayloadError> { ReadExactly(Rc::clone(&self.inner), size)
self.inner.borrow_mut().readexactly(size)
} }
/// Read until `\n` /// Read until `\n`
/// Returns Some(PayloadItem) as line, `None` indicates eof. pub fn readline(&mut self) -> ReadLine {
pub fn readline(&mut self) -> Result<Async<Bytes>, PayloadError> { ReadLine(Rc::clone(&self.inner))
self.inner.borrow_mut().readline()
} }
/// Read until match line /// Read until match line
/// Returns Some(PayloadItem) as line, `None` indicates eof. pub fn readuntil(&mut self, line: &[u8]) -> ReadUntil {
pub fn readuntil(&mut self, line: &[u8]) -> Result<Async<Bytes>, PayloadError> { ReadUntil(Rc::clone(&self.inner), line.to_vec())
self.inner.borrow_mut().readuntil(line)
} }
#[doc(hidden)] #[doc(hidden)]
@ -133,19 +130,91 @@ impl Payload {
/// Convert payload into BodyStream /// Convert payload into BodyStream
pub fn stream(self) -> BodyStream { pub fn stream(self) -> BodyStream {
Box::new(self.map(|item| item.0).map_err(|e| e.into())) Box::new(self.map_err(|e| e.into()))
} }
} }
impl Stream for Payload { impl Stream for Payload {
type Item = PayloadItem; type Item = Bytes;
type Error = PayloadError; type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> { fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.readany() match self.inner.borrow_mut().readany()? {
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
} }
} }
impl Clone for Payload {
fn clone(&self) -> Payload {
Payload{inner: Rc::clone(&self.inner)}
}
}
/// Get first available chunk of data
pub struct ReadAny(Rc<RefCell<Inner>>);
impl Stream for ReadAny {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Option<Bytes>, Self::Error> {
match self.0.borrow_mut().readany()? {
Async::Ready(Some(item)) => Ok(Async::Ready(Some(item.0))),
Async::Ready(None) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Get exact number of bytes
pub struct ReadExactly(Rc<RefCell<Inner>>, usize);
impl Future for ReadExactly {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readexactly(self.1)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read until `\n`
pub struct ReadLine(Rc<RefCell<Inner>>);
impl Future for ReadLine {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readline()? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Read until match line
pub struct ReadUntil(Rc<RefCell<Inner>>, Vec<u8>);
impl Future for ReadUntil {
type Item = Bytes;
type Error = PayloadError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.borrow_mut().readuntil(&self.1)? {
Async::Ready(chunk) => Ok(Async::Ready(chunk)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Payload writer interface.
pub trait PayloadWriter { pub trait PayloadWriter {
/// Set stream error. /// Set stream error.
@ -408,7 +477,7 @@ mod tests {
assert!(!payload.eof()); assert!(!payload.eof());
assert!(payload.is_empty()); assert!(payload.is_empty());
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -420,7 +489,7 @@ mod tests {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
assert!(!payload.eof()); assert!(!payload.eof());
sender.feed_data(Bytes::from("data")); sender.feed_data(Bytes::from("data"));
@ -428,13 +497,13 @@ mod tests {
assert!(!payload.eof()); assert!(!payload.eof());
assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("data")))), assert_eq!(Async::Ready(Some(Bytes::from("data"))),
payload.readany().ok().unwrap()); payload.readany().poll().ok().unwrap());
assert!(payload.is_empty()); assert!(payload.is_empty());
assert!(payload.eof()); assert!(payload.eof());
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
assert_eq!(Async::Ready(None), payload.readany().ok().unwrap()); assert_eq!(Async::Ready(None), payload.readany().poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -445,10 +514,10 @@ mod tests {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); assert_eq!(Async::NotReady, payload.readany().poll().ok().unwrap());
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readany().err().unwrap(); payload.readany().poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
})).unwrap(); })).unwrap();
@ -468,8 +537,8 @@ mod tests {
assert!(!payload.is_empty()); assert!(!payload.is_empty());
assert_eq!(payload.len(), 10); assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("line1")))), assert_eq!(Async::Ready(Some(Bytes::from("line1"))),
payload.readany().ok().unwrap()); payload.readany().poll().ok().unwrap());
assert!(!payload.is_empty()); assert!(!payload.is_empty());
assert_eq!(payload.len(), 5); assert_eq!(payload.len(), 5);
@ -483,20 +552,22 @@ mod tests {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readexactly(2).ok().unwrap()); assert_eq!(Async::NotReady, payload.readexactly(2).poll().ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!(payload.len(), 10); assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Bytes::from("li")), payload.readexactly(2).ok().unwrap()); assert_eq!(Async::Ready(Bytes::from("li")),
payload.readexactly(2).poll().ok().unwrap());
assert_eq!(payload.len(), 8); assert_eq!(payload.len(), 8);
assert_eq!(Async::Ready(Bytes::from("ne1l")), payload.readexactly(4).ok().unwrap()); assert_eq!(Async::Ready(Bytes::from("ne1l")),
payload.readexactly(4).poll().ok().unwrap());
assert_eq!(payload.len(), 4); assert_eq!(payload.len(), 4);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readexactly(10).err().unwrap(); payload.readexactly(10).poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -508,22 +579,22 @@ mod tests {
Core::new().unwrap().run(lazy(|| { Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false); let (mut sender, mut payload) = Payload::new(false);
assert_eq!(Async::NotReady, payload.readuntil(b"ne").ok().unwrap()); assert_eq!(Async::NotReady, payload.readuntil(b"ne").poll().ok().unwrap());
sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2")); sender.feed_data(Bytes::from("line2"));
assert_eq!(payload.len(), 10); assert_eq!(payload.len(), 10);
assert_eq!(Async::Ready(Bytes::from("line")), assert_eq!(Async::Ready(Bytes::from("line")),
payload.readuntil(b"ne").ok().unwrap()); payload.readuntil(b"ne").poll().ok().unwrap());
assert_eq!(payload.len(), 6); assert_eq!(payload.len(), 6);
assert_eq!(Async::Ready(Bytes::from("1line2")), assert_eq!(Async::Ready(Bytes::from("1line2")),
payload.readuntil(b"2").ok().unwrap()); payload.readuntil(b"2").poll().ok().unwrap());
assert_eq!(payload.len(), 0); assert_eq!(payload.len(), 0);
sender.set_error(PayloadError::Incomplete); sender.set_error(PayloadError::Incomplete);
payload.readuntil(b"b").err().unwrap(); payload.readuntil(b"b").poll().err().unwrap();
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)
@ -539,8 +610,8 @@ mod tests {
assert!(!payload.is_empty()); assert!(!payload.is_empty());
assert_eq!(payload.len(), 4); assert_eq!(payload.len(), 4);
assert_eq!(Async::Ready(Some(PayloadItem(Bytes::from("data")))), assert_eq!(Async::Ready(Some(Bytes::from("data"))),
payload.readany().ok().unwrap()); payload.readany().poll().ok().unwrap());
let res: Result<(), ()> = Ok(()); let res: Result<(), ()> = Ok(());
result(res) result(res)

View File

@ -57,7 +57,7 @@ use actix::{Actor, AsyncContext, ResponseType, StreamHandler};
use body::Body; use body::Body;
use context::HttpContext; use context::HttpContext;
use handler::Reply; use handler::Reply;
use payload::Payload; use payload::ReadAny;
use error::{Error, WsHandshakeError}; use error::{Error, WsHandshakeError};
use httprequest::HttpRequest; use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse}; use httpresponse::{ConnectionType, HttpResponse};
@ -96,15 +96,11 @@ pub fn start<A, S>(mut req: HttpRequest<S>, actor: A) -> Result<Reply, Error>
{ {
let resp = handshake(&req)?; let resp = handshake(&req)?;
if let Some(payload) = req.take_payload() { let stream = WsStream::new(req.payload_mut().readany());
let stream = WsStream::new(payload); let mut ctx = HttpContext::new(req, actor);
let mut ctx = HttpContext::new(req, actor); ctx.start(resp);
ctx.start(resp); ctx.add_stream(stream);
ctx.add_stream(stream); Ok(ctx.into())
Ok(ctx.into())
} else {
Err(WsHandshakeError::NoPayload.into())
}
} }
/// Prepare `WebSocket` handshake response. /// Prepare `WebSocket` handshake response.
@ -175,14 +171,14 @@ pub fn handshake<S>(req: &HttpRequest<S>) -> Result<HttpResponse, WsHandshakeErr
/// Maps `Payload` stream into stream of `ws::Message` items /// Maps `Payload` stream into stream of `ws::Message` items
pub struct WsStream { pub struct WsStream {
rx: Payload, rx: ReadAny,
buf: BytesMut, buf: BytesMut,
closed: bool, closed: bool,
error_sent: bool, error_sent: bool,
} }
impl WsStream { impl WsStream {
pub fn new(payload: Payload) -> WsStream { pub fn new(payload: ReadAny) -> WsStream {
WsStream { rx: payload, WsStream { rx: payload,
buf: BytesMut::new(), buf: BytesMut::new(),
closed: false, closed: false,
@ -199,9 +195,9 @@ impl Stream for WsStream {
if !self.closed { if !self.closed {
loop { loop {
match self.rx.readany() { match self.rx.poll() {
Ok(Async::Ready(Some(chunk))) => { Ok(Async::Ready(Some(chunk))) => {
self.buf.extend_from_slice(&chunk.0) self.buf.extend_from_slice(&chunk)
} }
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
done = true; done = true;