diff --git a/examples/basic.rs b/examples/basic.rs index 8e8da3e87..e52eac0dc 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -95,6 +95,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - println!("Started http server: 127.0.0.1:8080"); + println!("Starting http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/multipart/Cargo.toml b/examples/multipart/Cargo.toml index 5cb2031df..5a8d582e9 100644 --- a/examples/multipart/Cargo.toml +++ b/examples/multipart/Cargo.toml @@ -9,5 +9,7 @@ path = "src/main.rs" [dependencies] env_logger = "*" +futures = "0.1" actix = "^0.3.1" -actix-web = { git = "https://github.com/actix/actix-web.git" } +#actix-web = { git = "https://github.com/actix/actix-web.git" } +actix-web = { path = "../../" } diff --git a/examples/multipart/client.py b/examples/multipart/client.py index 35f97c1a6..2e3068d4c 100644 --- a/examples/multipart/client.py +++ b/examples/multipart/client.py @@ -2,25 +2,25 @@ import asyncio import aiohttp -def req1(): +async def req1(): with aiohttp.MultipartWriter() as writer: writer.append('test') writer.append_json({'passed': True}) - resp = yield from aiohttp.request( + resp = await aiohttp.ClientSession().request( "post", 'http://localhost:8080/multipart', data=writer, headers=writer.headers) print(resp) assert 200 == resp.status -def req2(): +async def req2(): with aiohttp.MultipartWriter() as writer: writer.append('test') writer.append_json({'passed': True}) writer.append(open('src/main.rs')) - resp = yield from aiohttp.request( + resp = await aiohttp.ClientSession().request( "post", 'http://localhost:8080/multipart', data=writer, headers=writer.headers) print(resp) diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index a1f31527d..ac3714ad0 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -2,76 +2,60 @@ extern crate actix; extern crate actix_web; extern crate env_logger; +extern crate futures; -use actix::*; use actix_web::*; +use futures::{Future, Stream}; +use futures::future::{result, Either}; -struct MyRoute; -impl Actor for MyRoute { - type Context = HttpContext; -} +fn index(mut req: HttpRequest) -> Box> +{ + println!("{:?}", req); -impl Route for MyRoute { - type State = (); - - fn request(mut req: HttpRequest, ctx: &mut HttpContext) -> RouteResult { - println!("{:?}", req); - - let multipart = req.multipart()?; - - // get Multipart stream - WrapStream::::actstream(multipart) - .and_then(|item, act, ctx| { + // get multipart stream and iterate over multipart items + Box::new( + req.multipart() + .map_err(Error::from) + .and_then(|item| { // Multipart stream is a stream of Fields and nested Multiparts match item { multipart::MultipartItem::Field(field) => { println!("==== FIELD ==== {:?}", field); // Read field's stream - fut::Either::A( - field.actstream() - .map(|chunk, act, ctx| { - println!( - "-- CHUNK: \n{}", - std::str::from_utf8(&chunk.0).unwrap()); - }) - .finish()) + Either::A( + field.map_err(Error::from) + .map(|chunk| { + println!("-- CHUNK: \n{}", + std::str::from_utf8(&chunk.0).unwrap());}) + .fold((), |_, _| result::<_, Error>(Ok(())))) }, multipart::MultipartItem::Nested(mp) => { // Do nothing for nested multipart stream - fut::Either::B(fut::ok(())) + Either::B(result(Ok(()))) } } }) // wait until stream finish - .finish() - .map_err(|e, act, ctx| { - ctx.start(httpcodes::HTTPBadRequest); - ctx.write_eof(); - }) - .map(|_, act, ctx| { - ctx.start(httpcodes::HTTPOk); - ctx.write_eof(); - }) - .spawn(ctx); - - Reply::async(MyRoute) - } + .fold((), |_, _| result::<_, Error>(Ok(()))) + .map(|_| httpcodes::HTTPOk.response()) + ) } fn main() { + ::std::env::set_var("RUST_LOG", "actix_web=info"); let _ = env_logger::init(); let sys = actix::System::new("multipart-example"); HttpServer::new( - vec![ - Application::default("/") - .resource("/multipart", |r| { - r.post::(); - }).finish() - ]) - .serve::<_, ()>("127.0.0.1:8080").unwrap(); + || Application::new() + // enable logger + .middleware(middlewares::Logger::default()) + .resource("/multipart", |r| r.method(Method::POST).a(index))) + .bind("127.0.0.1:8080").unwrap() + .start(); + println!("Starting http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/guide/src/qs_7.md b/guide/src/qs_7.md index b1fa3579e..2ab8e69ac 100644 --- a/guide/src/qs_7.md +++ b/guide/src/qs_7.md @@ -138,7 +138,7 @@ future object that resolve to Bytes object. * *readuntil* method returns *Future* that resolves when specified bytes string matches in input bytes stream -Here is example that reads request payload and prints it. +In this example handle reads request payload chunk by chunk and prints every chunk. ```rust # extern crate actix_web; diff --git a/src/httprequest.rs b/src/httprequest.rs index f22fea54d..067879edc 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -17,7 +17,7 @@ use payload::Payload; use multipart::Multipart; use helpers::SharedHttpMessage; use error::{ParseError, PayloadError, UrlGenerationError, - MultipartError, CookieParseError, HttpRangeError, UrlencodedError}; + CookieParseError, HttpRangeError, UrlencodedError}; pub struct HttpMessage { @@ -406,9 +406,8 @@ impl HttpRequest { /// Return stream to process BODY as multipart. /// /// Content-type: multipart/form-data; - pub fn multipart(&mut self) -> Result { - let boundary = Multipart::boundary(self.headers())?; - Ok(Multipart::new(boundary, self.payload().clone())) + pub fn multipart(&mut self) -> Multipart { + Multipart::from_request(self) } /// Parse `application/x-www-form-urlencoded` encoded body. diff --git a/src/multipart.rs b/src/multipart.rs index 59ba232e7..634b6652f 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -14,6 +14,7 @@ use futures::task::{Task, current as current_task}; use error::{ParseError, PayloadError, MultipartError}; use payload::Payload; +use httprequest::HttpRequest; const MAX_HEADERS: usize = 32; @@ -26,7 +27,8 @@ const MAX_HEADERS: usize = 32; #[derive(Debug)] pub struct Multipart { safety: Safety, - inner: Rc>, + error: Option, + inner: Option>>, } /// @@ -66,17 +68,32 @@ struct InnerMultipart { } impl Multipart { + /// Create multipart instance for boundary. pub fn new(boundary: String, payload: Payload) -> Multipart { Multipart { + error: None, safety: Safety::new(), - inner: Rc::new(RefCell::new( + inner: Some(Rc::new(RefCell::new( InnerMultipart { payload: PayloadRef::new(payload), boundary: boundary, state: InnerState::FirstBoundary, item: InnerMultipartItem::None, - })) + }))) + } + } + + /// Create multipart instance for request. + pub fn from_request(req: &mut HttpRequest) -> Multipart { + match Multipart::boundary(req.headers()) { + Ok(boundary) => Multipart::new(boundary, req.payload().clone()), + Err(err) => + Multipart { + error: Some(err), + safety: Safety::new(), + inner: None, + } } } @@ -107,8 +124,10 @@ impl Stream for Multipart { type Error = MultipartError; fn poll(&mut self) -> Poll, Self::Error> { - if self.safety.current() { - self.inner.borrow_mut().poll(&self.safety) + if let Some(err) = self.error.take() { + Err(err) + } else if self.safety.current() { + self.inner.as_mut().unwrap().borrow_mut().poll(&self.safety) } else { Ok(Async::NotReady) } @@ -327,7 +346,9 @@ impl InnerMultipart { Ok(Async::Ready(Some( MultipartItem::Nested( - Multipart{safety: safety.clone(), inner: inner})))) + Multipart{safety: safety.clone(), + error: None, + inner: Some(inner)})))) } else { let field = Rc::new(RefCell::new(InnerField::new( self.payload.clone(), self.boundary.clone(), &headers)?));