From 895e409d57178666aeb0faff97458052ab6d68ca Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 21 Apr 2019 15:41:01 -0700 Subject: [PATCH] Optimize multipart handling #634, #769 --- actix-multipart/CHANGES.md | 4 +- actix-multipart/Cargo.toml | 8 +- actix-multipart/src/server.rs | 140 ++++++++++++++++++++-------------- 3 files changed, 90 insertions(+), 62 deletions(-) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index fec3e50f8..9f8fa052a 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,7 +1,9 @@ # Changes -## [0.1.0-alpha.1] - 2019-04-xx +## [0.1.0-beta.1] - 2019-04-21 * Do not support nested multipart * Split multipart support to separate crate + +* Optimize multipart handling #634, #769 \ No newline at end of file diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index 58ab45230..8e1714e7d 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-multipart" -version = "0.1.0-alpha.1" +version = "0.1.0-beta.1" authors = ["Nikolay Kim "] description = "Multipart support for actix web framework." readme = "README.md" @@ -18,8 +18,8 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = "1.0.0-alpha.6" -actix-service = "0.3.4" +actix-web = "1.0.0-beta.1" +actix-service = "0.3.6" bytes = "0.4" derive_more = "0.14" httparse = "1.3" @@ -31,4 +31,4 @@ twoway = "0.2" [dev-dependencies] actix-rt = "0.2.2" -actix-http = "0.1.0" \ No newline at end of file +actix-http = "0.1.1" \ No newline at end of file diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index c651fae56..59ed55994 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -168,7 +168,7 @@ impl InnerMultipart { match payload.readline() { None => { if payload.eof { - Err(MultipartError::Incomplete) + Ok(Some(true)) } else { Ok(None) } @@ -201,8 +201,7 @@ impl InnerMultipart { match payload.readline() { Some(chunk) => { if chunk.is_empty() { - //ValueError("Could not find starting boundary %r" - //% (self._boundary)) + return Err(MultipartError::Boundary); } if chunk.len() < boundary.len() { continue; @@ -505,47 +504,73 @@ impl InnerField { payload: &mut PayloadBuffer, boundary: &str, ) -> Poll, MultipartError> { - match payload.read_until(b"\r") { - None => { - if payload.eof { - Err(MultipartError::Incomplete) + let mut pos = 0; + + let len = payload.buf.len(); + if len == 0 { + return Ok(Async::NotReady); + } + + // check boundary + if len > 4 && payload.buf[0] == b'\r' { + let b_len = if &payload.buf[..2] == b"\r\n" && &payload.buf[2..4] == b"--" { + Some(4) + } else if &payload.buf[1..3] == b"--" { + Some(3) + } else { + None + }; + + if let Some(b_len) = b_len { + let b_size = boundary.len() + b_len; + if len < b_size { + return Ok(Async::NotReady); } else { - Ok(Async::NotReady) + if &payload.buf[b_len..b_size] == boundary.as_bytes() { + // found boundary + payload.buf.split_to(b_size); + return Ok(Async::Ready(None)); + } else { + pos = b_size; + } } } - Some(mut chunk) => { - if chunk.len() == 1 { - payload.unprocessed(chunk); - match payload.read_exact(boundary.len() + 4) { - None => { - if payload.eof { - Err(MultipartError::Incomplete) - } else { - Ok(Async::NotReady) - } - } - Some(mut chunk) => { - if &chunk[..2] == b"\r\n" - && &chunk[2..4] == b"--" - && &chunk[4..] == boundary.as_bytes() - { - payload.unprocessed(chunk); - Ok(Async::Ready(None)) - } else { - // \r might be part of data stream - let ch = chunk.split_to(1); - payload.unprocessed(chunk); - Ok(Async::Ready(Some(ch))) - } - } + } + + loop { + return if let Some(idx) = twoway::find_bytes(&payload.buf[pos..], b"\r") { + let cur = pos + idx; + + // check if we have enough data for boundary detection + if cur + 4 > len { + if cur > 0 { + Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + } else { + Ok(Async::NotReady) } } else { - let to = chunk.len() - 1; - let ch = chunk.split_to(to); - payload.unprocessed(chunk); - Ok(Async::Ready(Some(ch))) + // check boundary + if (&payload.buf[cur..cur + 2] == b"\r\n" + && &payload.buf[cur + 2..cur + 4] == b"--") + || (&payload.buf[cur..cur + 1] == b"\r" + && &payload.buf[cur + 1..cur + 3] == b"--") + { + if cur != 0 { + // return buffer + Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + } else { + pos = cur + 1; + continue; + } + } else { + // not boundary + pos = cur + 1; + continue; + } } - } + } else { + return Ok(Async::Ready(Some(payload.buf.take().freeze()))); + }; } } @@ -555,26 +580,27 @@ impl InnerField { } let result = if let Some(payload) = self.payload.as_ref().unwrap().get_mut(s) { - let res = if let Some(ref mut len) = self.length { - InnerField::read_len(payload, len)? - } else { - InnerField::read_stream(payload, &self.boundary)? - }; + if !self.eof { + let res = if let Some(ref mut len) = self.length { + InnerField::read_len(payload, len)? + } else { + InnerField::read_stream(payload, &self.boundary)? + }; - match res { - Async::NotReady => Async::NotReady, - Async::Ready(Some(bytes)) => Async::Ready(Some(bytes)), - Async::Ready(None) => { - self.eof = true; - match payload.readline() { - None => Async::Ready(None), - Some(line) => { - if line.as_ref() != b"\r\n" { - log::warn!("multipart field did not read all the data or it is malformed"); - } - Async::Ready(None) - } + match res { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(Some(bytes)) => return Ok(Async::Ready(Some(bytes))), + Async::Ready(None) => self.eof = true, + } + } + + match payload.readline() { + None => Async::Ready(None), + Some(line) => { + if line.as_ref() != b"\r\n" { + log::warn!("multipart field did not read all the data or it is malformed"); } + Async::Ready(None) } } } else { @@ -704,7 +730,7 @@ impl PayloadBuffer { } /// Read exact number of bytes - #[inline] + #[cfg(test)] fn read_exact(&mut self, size: usize) -> Option { if size <= self.buf.len() { Some(self.buf.split_to(size).freeze())