1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

Handle cancellation of uploads #834 #736

This commit is contained in:
Nikolay Kim 2019-05-12 11:43:05 -07:00
parent 36d017dcc6
commit 2350a2dc68
4 changed files with 25 additions and 5 deletions

View File

@ -1,5 +1,11 @@
# Changes # Changes
## [0.1.0-beta.4] - 2019-05-12
* Handle cancellation of uploads #834 #736
* Upgrade to actix-web 1.0.0-beta.4
## [0.1.0-beta.1] - 2019-04-21 ## [0.1.0-beta.1] - 2019-04-21
* Do not support nested multipart * Do not support nested multipart

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-multipart" name = "actix-multipart"
version = "0.1.0-beta.1" version = "0.1.0-beta.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Multipart support for actix web framework." description = "Multipart support for actix web framework."
readme = "README.md" readme = "README.md"
@ -18,7 +18,7 @@ name = "actix_multipart"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-web = "1.0.0-beta.1" actix-web = "1.0.0-beta.4"
actix-service = "0.4.0" actix-service = "0.4.0"
bytes = "0.4" bytes = "0.4"
derive_more = "0.14" derive_more = "0.14"

View File

@ -28,6 +28,9 @@ pub enum MultipartError {
/// Payload error /// Payload error
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
Payload(PayloadError), Payload(PayloadError),
/// Not consumed
#[display(fmt = "Multipart stream is not consumed")]
NotConsumed,
} }
/// Return `BadRequest` for `MultipartError` /// Return `BadRequest` for `MultipartError`

View File

@ -1,5 +1,5 @@
//! Multipart payload support //! Multipart payload support
use std::cell::{RefCell, UnsafeCell}; use std::cell::{Cell, RefCell, UnsafeCell};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::rc::Rc; use std::rc::Rc;
use std::{cmp, fmt}; use std::{cmp, fmt};
@ -116,6 +116,8 @@ impl Stream for Multipart {
payload.poll_stream()?; payload.poll_stream()?;
} }
inner.poll(&self.safety) inner.poll(&self.safety)
} else if !self.safety.is_clean() {
Err(MultipartError::NotConsumed)
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
@ -415,6 +417,8 @@ impl Stream for Field {
} }
inner.poll(&self.safety) inner.poll(&self.safety)
} else if !self.safety.is_clean() {
return Err(MultipartError::NotConsumed);
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
@ -655,6 +659,7 @@ struct Safety {
task: Option<Task>, task: Option<Task>,
level: usize, level: usize,
payload: Rc<PhantomData<bool>>, payload: Rc<PhantomData<bool>>,
clean: Rc<Cell<bool>>,
} }
impl Safety { impl Safety {
@ -663,12 +668,17 @@ impl Safety {
Safety { Safety {
task: None, task: None,
level: Rc::strong_count(&payload), level: Rc::strong_count(&payload),
clean: Rc::new(Cell::new(true)),
payload, payload,
} }
} }
fn current(&self) -> bool { fn current(&self) -> bool {
Rc::strong_count(&self.payload) == self.level Rc::strong_count(&self.payload) == self.level && self.clean.get()
}
fn is_clean(&self) -> bool {
self.clean.get()
} }
} }
@ -678,6 +688,7 @@ impl Clone for Safety {
Safety { Safety {
task: Some(current_task()), task: Some(current_task()),
level: Rc::strong_count(&payload), level: Rc::strong_count(&payload),
clean: self.clean.clone(),
payload, payload,
} }
} }
@ -687,7 +698,7 @@ impl Drop for Safety {
fn drop(&mut self) { fn drop(&mut self) {
// parent task is dead // parent task is dead
if Rc::strong_count(&self.payload) != self.level { if Rc::strong_count(&self.payload) != self.level {
panic!("Safety get dropped but it is not from top-most task"); self.clean.set(true);
} }
if let Some(task) = self.task.take() { if let Some(task) = self.task.take() {
task.notify() task.notify()