1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 09:42:40 +01:00

refactor multipart stream creation

This commit is contained in:
Nikolay Kim 2017-12-19 09:51:28 -08:00
parent 13cbfc877d
commit 790793f8a1
7 changed files with 68 additions and 62 deletions

View File

@ -95,6 +95,6 @@ fn main() {
.bind("127.0.0.1:8080").unwrap() .bind("127.0.0.1:8080").unwrap()
.start(); .start();
println!("Started http server: 127.0.0.1:8080"); println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -9,5 +9,7 @@ path = "src/main.rs"
[dependencies] [dependencies]
env_logger = "*" env_logger = "*"
futures = "0.1"
actix = "^0.3.1" actix = "^0.3.1"
actix-web = { git = "https://github.com/actix/actix-web.git" } #actix-web = { git = "https://github.com/actix/actix-web.git" }
actix-web = { path = "../../" }

View File

@ -2,25 +2,25 @@ import asyncio
import aiohttp import aiohttp
def req1(): async def req1():
with aiohttp.MultipartWriter() as writer: with aiohttp.MultipartWriter() as writer:
writer.append('test') writer.append('test')
writer.append_json({'passed': True}) writer.append_json({'passed': True})
resp = yield from aiohttp.request( resp = await aiohttp.ClientSession().request(
"post", 'http://localhost:8080/multipart', "post", 'http://localhost:8080/multipart',
data=writer, headers=writer.headers) data=writer, headers=writer.headers)
print(resp) print(resp)
assert 200 == resp.status assert 200 == resp.status
def req2(): async def req2():
with aiohttp.MultipartWriter() as writer: with aiohttp.MultipartWriter() as writer:
writer.append('test') writer.append('test')
writer.append_json({'passed': True}) writer.append_json({'passed': True})
writer.append(open('src/main.rs')) writer.append(open('src/main.rs'))
resp = yield from aiohttp.request( resp = await aiohttp.ClientSession().request(
"post", 'http://localhost:8080/multipart', "post", 'http://localhost:8080/multipart',
data=writer, headers=writer.headers) data=writer, headers=writer.headers)
print(resp) print(resp)

View File

@ -2,76 +2,60 @@
extern crate actix; extern crate actix;
extern crate actix_web; extern crate actix_web;
extern crate env_logger; extern crate env_logger;
extern crate futures;
use actix::*;
use actix_web::*; use actix_web::*;
use futures::{Future, Stream};
use futures::future::{result, Either};
struct MyRoute;
impl Actor for MyRoute { fn index(mut req: HttpRequest) -> Box<Future<Item=HttpResponse, Error=Error>>
type Context = HttpContext<Self>; {
}
impl Route for MyRoute {
type State = ();
fn request(mut req: HttpRequest, ctx: &mut HttpContext<Self>) -> RouteResult<Self> {
println!("{:?}", req); println!("{:?}", req);
let multipart = req.multipart()?; // get multipart stream and iterate over multipart items
Box::new(
// get Multipart stream req.multipart()
WrapStream::<MyRoute>::actstream(multipart) .map_err(Error::from)
.and_then(|item, act, ctx| { .and_then(|item| {
// Multipart stream is a stream of Fields and nested Multiparts // Multipart stream is a stream of Fields and nested Multiparts
match item { match item {
multipart::MultipartItem::Field(field) => { multipart::MultipartItem::Field(field) => {
println!("==== FIELD ==== {:?}", field); println!("==== FIELD ==== {:?}", field);
// Read field's stream // Read field's stream
fut::Either::A( Either::A(
field.actstream() field.map_err(Error::from)
.map(|chunk, act, ctx| { .map(|chunk| {
println!( println!("-- CHUNK: \n{}",
"-- CHUNK: \n{}", std::str::from_utf8(&chunk.0).unwrap());})
std::str::from_utf8(&chunk.0).unwrap()); .fold((), |_, _| result::<_, Error>(Ok(()))))
})
.finish())
}, },
multipart::MultipartItem::Nested(mp) => { multipart::MultipartItem::Nested(mp) => {
// Do nothing for nested multipart stream // Do nothing for nested multipart stream
fut::Either::B(fut::ok(())) Either::B(result(Ok(())))
} }
} }
}) })
// wait until stream finish // wait until stream finish
.finish() .fold((), |_, _| result::<_, Error>(Ok(())))
.map_err(|e, act, ctx| { .map(|_| httpcodes::HTTPOk.response())
ctx.start(httpcodes::HTTPBadRequest); )
ctx.write_eof();
})
.map(|_, act, ctx| {
ctx.start(httpcodes::HTTPOk);
ctx.write_eof();
})
.spawn(ctx);
Reply::async(MyRoute)
}
} }
fn main() { fn main() {
::std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::init(); let _ = env_logger::init();
let sys = actix::System::new("multipart-example"); let sys = actix::System::new("multipart-example");
HttpServer::new( HttpServer::new(
vec![ || Application::new()
Application::default("/") // enable logger
.resource("/multipart", |r| { .middleware(middlewares::Logger::default())
r.post::<MyRoute>(); .resource("/multipart", |r| r.method(Method::POST).a(index)))
}).finish() .bind("127.0.0.1:8080").unwrap()
]) .start();
.serve::<_, ()>("127.0.0.1:8080").unwrap();
println!("Starting http server: 127.0.0.1:8080");
let _ = sys.run(); let _ = sys.run();
} }

View File

@ -138,7 +138,7 @@ future object that resolve to Bytes object.
* *readuntil* method returns *Future* that resolves when specified bytes string * *readuntil* method returns *Future* that resolves when specified bytes string
matches in input bytes stream matches in input bytes stream
Here is example that reads request payload and prints it. In this example handle reads request payload chunk by chunk and prints every chunk.
```rust ```rust
# extern crate actix_web; # extern crate actix_web;

View File

@ -17,7 +17,7 @@ use payload::Payload;
use multipart::Multipart; use multipart::Multipart;
use helpers::SharedHttpMessage; use helpers::SharedHttpMessage;
use error::{ParseError, PayloadError, UrlGenerationError, use error::{ParseError, PayloadError, UrlGenerationError,
MultipartError, CookieParseError, HttpRangeError, UrlencodedError}; CookieParseError, HttpRangeError, UrlencodedError};
pub struct HttpMessage { pub struct HttpMessage {
@ -406,9 +406,8 @@ impl<S> HttpRequest<S> {
/// Return stream to process BODY as multipart. /// Return stream to process BODY as multipart.
/// ///
/// Content-type: multipart/form-data; /// Content-type: multipart/form-data;
pub fn multipart(&mut self) -> Result<Multipart, MultipartError> { pub fn multipart(&mut self) -> Multipart {
let boundary = Multipart::boundary(self.headers())?; Multipart::from_request(self)
Ok(Multipart::new(boundary, self.payload().clone()))
} }
/// Parse `application/x-www-form-urlencoded` encoded body. /// Parse `application/x-www-form-urlencoded` encoded body.

View File

@ -14,6 +14,7 @@ use futures::task::{Task, current as current_task};
use error::{ParseError, PayloadError, MultipartError}; use error::{ParseError, PayloadError, MultipartError};
use payload::Payload; use payload::Payload;
use httprequest::HttpRequest;
const MAX_HEADERS: usize = 32; const MAX_HEADERS: usize = 32;
@ -26,7 +27,8 @@ const MAX_HEADERS: usize = 32;
#[derive(Debug)] #[derive(Debug)]
pub struct Multipart { pub struct Multipart {
safety: Safety, safety: Safety,
inner: Rc<RefCell<InnerMultipart>>, error: Option<MultipartError>,
inner: Option<Rc<RefCell<InnerMultipart>>>,
} }
/// ///
@ -66,17 +68,32 @@ struct InnerMultipart {
} }
impl Multipart { impl Multipart {
/// Create multipart instance for boundary. /// Create multipart instance for boundary.
pub fn new(boundary: String, payload: Payload) -> Multipart { pub fn new(boundary: String, payload: Payload) -> Multipart {
Multipart { Multipart {
error: None,
safety: Safety::new(), safety: Safety::new(),
inner: Rc::new(RefCell::new( inner: Some(Rc::new(RefCell::new(
InnerMultipart { InnerMultipart {
payload: PayloadRef::new(payload), payload: PayloadRef::new(payload),
boundary: boundary, boundary: boundary,
state: InnerState::FirstBoundary, state: InnerState::FirstBoundary,
item: InnerMultipartItem::None, item: InnerMultipartItem::None,
})) })))
}
}
/// Create multipart instance for request.
pub fn from_request<S>(req: &mut HttpRequest<S>) -> Multipart {
match Multipart::boundary(req.headers()) {
Ok(boundary) => Multipart::new(boundary, req.payload().clone()),
Err(err) =>
Multipart {
error: Some(err),
safety: Safety::new(),
inner: None,
}
} }
} }
@ -107,8 +124,10 @@ impl Stream for Multipart {
type Error = MultipartError; type Error = MultipartError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.safety.current() { if let Some(err) = self.error.take() {
self.inner.borrow_mut().poll(&self.safety) Err(err)
} else if self.safety.current() {
self.inner.as_mut().unwrap().borrow_mut().poll(&self.safety)
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
@ -327,7 +346,9 @@ impl InnerMultipart {
Ok(Async::Ready(Some( Ok(Async::Ready(Some(
MultipartItem::Nested( MultipartItem::Nested(
Multipart{safety: safety.clone(), inner: inner})))) Multipart{safety: safety.clone(),
error: None,
inner: Some(inner)}))))
} else { } else {
let field = Rc::new(RefCell::new(InnerField::new( let field = Rc::new(RefCell::new(InnerField::new(
self.payload.clone(), self.boundary.clone(), &headers)?)); self.payload.clone(), self.boundary.clone(), &headers)?));