mirror of
https://github.com/fafhrd91/actix-web
synced 2024-11-24 00:21:08 +01:00
Mnior cleanup of multipart API. (#2461)
This commit is contained in:
parent
3e6e9779dc
commit
52bbbd1d73
@ -20,7 +20,6 @@ actix-utils = "3.0.0"
|
|||||||
bytes = "1"
|
bytes = "1"
|
||||||
derive_more = "0.99.5"
|
derive_more = "0.99.5"
|
||||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
|
||||||
httparse = "1.3"
|
httparse = "1.3"
|
||||||
local-waker = "0.1"
|
local-waker = "0.1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
@ -30,5 +29,6 @@ twoway = "0.2"
|
|||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.2"
|
actix-rt = "2.2"
|
||||||
actix-http = "3.0.0-beta.13"
|
actix-http = "3.0.0-beta.13"
|
||||||
|
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
|
@ -17,7 +17,6 @@ use actix_web::{
|
|||||||
};
|
};
|
||||||
use bytes::{Bytes, BytesMut};
|
use bytes::{Bytes, BytesMut};
|
||||||
use futures_core::stream::{LocalBoxStream, Stream};
|
use futures_core::stream::{LocalBoxStream, Stream};
|
||||||
use futures_util::stream::StreamExt as _;
|
|
||||||
use local_waker::LocalWaker;
|
use local_waker::LocalWaker;
|
||||||
|
|
||||||
use crate::error::MultipartError;
|
use crate::error::MultipartError;
|
||||||
@ -67,7 +66,7 @@ impl Multipart {
|
|||||||
/// Create multipart instance for boundary.
|
/// Create multipart instance for boundary.
|
||||||
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
|
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
match Self::boundary(headers) {
|
match Self::boundary(headers) {
|
||||||
Ok(boundary) => Multipart::from_boundary(boundary, stream),
|
Ok(boundary) => Multipart::from_boundary(boundary, stream),
|
||||||
@ -77,36 +76,29 @@ impl Multipart {
|
|||||||
|
|
||||||
/// Extract boundary info from headers.
|
/// Extract boundary info from headers.
|
||||||
pub(crate) fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
|
pub(crate) fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
|
||||||
if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
|
headers
|
||||||
if let Ok(content_type) = content_type.to_str() {
|
.get(&header::CONTENT_TYPE)
|
||||||
if let Ok(ct) = content_type.parse::<mime::Mime>() {
|
.ok_or(MultipartError::NoContentType)?
|
||||||
if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
|
.to_str()
|
||||||
Ok(boundary.as_str().to_owned())
|
.ok()
|
||||||
} else {
|
.and_then(|content_type| content_type.parse::<mime::Mime>().ok())
|
||||||
Err(MultipartError::Boundary)
|
.ok_or(MultipartError::ParseContentType)?
|
||||||
}
|
.get_param(mime::BOUNDARY)
|
||||||
} else {
|
.map(|boundary| boundary.as_str().to_owned())
|
||||||
Err(MultipartError::ParseContentType)
|
.ok_or(MultipartError::Boundary)
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(MultipartError::ParseContentType)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(MultipartError::NoContentType)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create multipart instance for given boundary and stream
|
/// Create multipart instance for given boundary and stream
|
||||||
pub(crate) fn from_boundary<S>(boundary: String, stream: S) -> Multipart
|
pub(crate) fn from_boundary<S>(boundary: String, stream: S) -> Multipart
|
||||||
where
|
where
|
||||||
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
|
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
|
||||||
{
|
{
|
||||||
Multipart {
|
Multipart {
|
||||||
error: None,
|
error: None,
|
||||||
safety: Safety::new(),
|
safety: Safety::new(),
|
||||||
inner: Some(Rc::new(RefCell::new(InnerMultipart {
|
inner: Some(Rc::new(RefCell::new(InnerMultipart {
|
||||||
boundary,
|
boundary,
|
||||||
payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
|
payload: PayloadRef::new(PayloadBuffer::new(stream)),
|
||||||
state: InnerState::FirstBoundary,
|
state: InnerState::FirstBoundary,
|
||||||
item: InnerMultipartItem::None,
|
item: InnerMultipartItem::None,
|
||||||
}))),
|
}))),
|
||||||
@ -690,10 +682,7 @@ impl PayloadRef {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<RefMut<'a, PayloadBuffer>>
|
fn get_mut(&self, s: &Safety) -> Option<RefMut<'_, PayloadBuffer>> {
|
||||||
where
|
|
||||||
'a: 'b,
|
|
||||||
{
|
|
||||||
if s.current() {
|
if s.current() {
|
||||||
Some(self.payload.borrow_mut())
|
Some(self.payload.borrow_mut())
|
||||||
} else {
|
} else {
|
||||||
@ -779,7 +768,7 @@ impl PayloadBuffer {
|
|||||||
PayloadBuffer {
|
PayloadBuffer {
|
||||||
eof: false,
|
eof: false,
|
||||||
buf: BytesMut::new(),
|
buf: BytesMut::new(),
|
||||||
stream: stream.boxed_local(),
|
stream: Box::pin(stream),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -860,7 +849,7 @@ mod tests {
|
|||||||
use actix_web::test::TestRequest;
|
use actix_web::test::TestRequest;
|
||||||
use actix_web::FromRequest;
|
use actix_web::FromRequest;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::future::lazy;
|
use futures_util::{future::lazy, StreamExt};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user