From aa626a1e7281c22aebadbc296e2f8510eb1965dd Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 25 May 2019 03:16:46 -0700 Subject: [PATCH] handle disconnects --- actix-multipart/CHANGES.md | 6 +++- actix-multipart/Cargo.toml | 2 +- actix-multipart/src/server.rs | 64 ++++++++++++++++++++++------------- 3 files changed, 47 insertions(+), 25 deletions(-) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index 3a959890a..cf32859e3 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,12 +1,16 @@ # Changes +## [0.1.1] - 2019-05-25 + +* Fix disconnect handling #834 + ## [0.1.0] - 2019-05-18 * Release ## [0.1.0-beta.4] - 2019-05-12 -* Handle cancellation of uploads #834 #736 +* Handle cancellation of uploads #736 * Upgrade to actix-web 1.0.0-beta.4 diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index ca1ff9c91..fe63d5361 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-multipart" -version = "0.1.0" +version = "0.1.1" authors = ["Nikolay Kim "] description = "Multipart support for actix web framework." readme = "README.md" diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 7d746ea2f..8245d241e 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -128,7 +128,7 @@ impl InnerMultipart { fn read_headers( payload: &mut PayloadBuffer, ) -> Result, MultipartError> { - match payload.read_until(b"\r\n\r\n") { + match payload.read_until(b"\r\n\r\n")? { None => { if payload.eof { Err(MultipartError::Incomplete) @@ -167,7 +167,7 @@ impl InnerMultipart { boundary: &str, ) -> Result, MultipartError> { // TODO: need to read epilogue - match payload.readline() { + match payload.readline()? { None => { if payload.eof { Ok(Some(true)) @@ -200,7 +200,7 @@ impl InnerMultipart { ) -> Result, MultipartError> { let mut eof = false; loop { - match payload.readline() { + match payload.readline()? { Some(chunk) => { if chunk.is_empty() { return Err(MultipartError::Boundary); @@ -481,7 +481,7 @@ impl InnerField { if *size == 0 { Ok(Async::Ready(None)) } else { - match payload.read_max(*size) { + match payload.read_max(*size)? { Some(mut chunk) => { let len = cmp::min(chunk.len() as u64, *size); *size -= len; @@ -512,7 +512,11 @@ impl InnerField { let len = payload.buf.len(); if len == 0 { - return Ok(Async::NotReady); + return if payload.eof { + Err(MultipartError::Incomplete) + } else { + Ok(Async::NotReady) + }; } // check boundary @@ -597,7 +601,7 @@ impl InnerField { } } - match payload.readline() { + match payload.readline()? { None => Async::Ready(None), Some(line) => { if line.as_ref() != b"\r\n" { @@ -749,23 +753,31 @@ impl PayloadBuffer { } } - fn read_max(&mut self, size: u64) -> Option { + fn read_max(&mut self, size: u64) -> Result, MultipartError> { if !self.buf.is_empty() { let size = std::cmp::min(self.buf.len() as u64, size) as usize; - Some(self.buf.split_to(size).freeze()) + Ok(Some(self.buf.split_to(size).freeze())) + } else if self.eof { + Err(MultipartError::Incomplete) } else { - None + Ok(None) } } /// Read until specified ending - pub fn read_until(&mut self, line: &[u8]) -> Option { - twoway::find_bytes(&self.buf, line) - .map(|idx| self.buf.split_to(idx + line.len()).freeze()) + pub fn read_until(&mut self, line: &[u8]) -> Result, MultipartError> { + let res = twoway::find_bytes(&self.buf, line) + .map(|idx| self.buf.split_to(idx + line.len()).freeze()); + + if res.is_none() && self.eof { + Err(MultipartError::Incomplete) + } else { + Ok(res) + } } /// Read bytes until new line delimiter - pub fn readline(&mut self) -> Option { + pub fn readline(&mut self) -> Result, MultipartError> { self.read_until(b"\n") } @@ -991,7 +1003,7 @@ mod tests { assert_eq!(payload.buf.len(), 0); payload.poll_stream().unwrap(); - assert_eq!(None, payload.read_max(1)); + assert_eq!(None, payload.read_max(1).unwrap()); }) } @@ -1001,14 +1013,14 @@ mod tests { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(None, payload.read_max(4)); + assert_eq!(None, payload.read_max(4).unwrap()); sender.feed_data(Bytes::from("data")); sender.feed_eof(); payload.poll_stream().unwrap(); - assert_eq!(Some(Bytes::from("data")), payload.read_max(4)); + assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap()); assert_eq!(payload.buf.len(), 0); - assert_eq!(None, payload.read_max(1)); + assert!(payload.read_max(1).is_err()); assert!(payload.eof); }) } @@ -1018,7 +1030,7 @@ mod tests { run_on(|| { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(None, payload.read_max(1)); + assert_eq!(None, payload.read_max(1).unwrap()); sender.set_error(PayloadError::Incomplete(None)); payload.poll_stream().err().unwrap(); }) @@ -1035,10 +1047,10 @@ mod tests { payload.poll_stream().unwrap(); assert_eq!(payload.buf.len(), 10); - assert_eq!(Some(Bytes::from("line1")), payload.read_max(5)); + assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap()); assert_eq!(payload.buf.len(), 5); - assert_eq!(Some(Bytes::from("line2")), payload.read_max(5)); + assert_eq!(Some(Bytes::from("line2")), payload.read_max(5).unwrap()); assert_eq!(payload.buf.len(), 0); }) } @@ -1069,16 +1081,22 @@ mod tests { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); - assert_eq!(None, payload.read_until(b"ne")); + assert_eq!(None, payload.read_until(b"ne").unwrap()); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); payload.poll_stream().unwrap(); - assert_eq!(Some(Bytes::from("line")), payload.read_until(b"ne")); + assert_eq!( + Some(Bytes::from("line")), + payload.read_until(b"ne").unwrap() + ); assert_eq!(payload.buf.len(), 6); - assert_eq!(Some(Bytes::from("1line2")), payload.read_until(b"2")); + assert_eq!( + Some(Bytes::from("1line2")), + payload.read_until(b"2").unwrap() + ); assert_eq!(payload.buf.len(), 0); }) }