From fb0270e27d8ce95b2ca9f7d37aa8f2e35c6102fc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 26 Oct 2017 23:14:33 -0700 Subject: [PATCH] refactor Payload stream --- .travis.yml | 1 + CHANGES.md | 2 ++ examples/multipart/Cargo.toml | 4 +-- examples/websocket-chat/Cargo.toml | 4 +-- src/httprequest.rs | 13 ++++----- src/multipart.rs | 16 +++++------ src/payload.rs | 43 +++++++++++++++++------------- src/server.rs | 5 ++-- src/ws.rs | 16 +++++------ 9 files changed, 56 insertions(+), 48 deletions(-) diff --git a/.travis.yml b/.travis.yml index eff26523f..329511c98 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: rust rust: + - 1.20.0 - stable - beta - nightly diff --git a/CHANGES.md b/CHANGES.md index 76866d2fc..b20895abc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,8 @@ * Refactor `HttpContext::write` +* Refactor `Payload` stream + * Re-use `BinaryBody` for `Frame::Payload` * Fix disconnection handling. diff --git a/examples/multipart/Cargo.toml b/examples/multipart/Cargo.toml index e1f6ea8c4..dfd548085 100644 --- a/examples/multipart/Cargo.toml +++ b/examples/multipart/Cargo.toml @@ -9,6 +9,6 @@ path = "src/main.rs" [dependencies] env_logger = "*" -actix = "0.3" -#actix = { git = "https://github.com/actix/actix.git" } +#actix = "0.3" +actix = { git = "https://github.com/actix/actix.git" } actix-web = { path = "../../" } diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index 3495d63c9..f5d27cca9 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -24,6 +24,6 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -actix = "0.3" -#actix = { git = "https://github.com/actix/actix.git" } +#actix = "0.3" +actix = { git = "https://github.com/actix/actix.git" } actix-web = { path = "../../" } diff --git a/src/httprequest.rs b/src/httprequest.rs index 84d02214e..cb5bbe764 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -268,7 +268,6 @@ impl Future for UrlEncoded { fn poll(&mut self) -> Poll { loop { return match self.pl.poll() { - Err(_) => unreachable!(), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => { let mut m = HashMap::new(); @@ -277,13 +276,11 @@ impl Future for UrlEncoded { } Ok(Async::Ready(m)) }, - Ok(Async::Ready(Some(item))) => match item { - Ok(bytes) => { - self.body.extend(bytes); - continue - }, - Err(err) => Err(err), - } + Ok(Async::Ready(Some(item))) => { + self.body.extend(item.0); + continue + }, + Err(err) => Err(err), } } } diff --git a/src/multipart.rs b/src/multipart.rs index b3317b379..d0a6c1282 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -512,18 +512,18 @@ impl InnerField { Ok(Async::Ready(None)) } else { match payload.readany() { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(None) => Ok(Async::Ready(None)), - Async::Ready(Some(Ok(mut chunk))) => { - let len = cmp::min(chunk.len() as u64, *size); + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::Ready(Some(mut chunk))) => { + let len = cmp::min(chunk.0.len() as u64, *size); *size -= len; - let ch = chunk.split_to(len as usize); - if !chunk.is_empty() { - payload.unread_data(chunk); + let ch = chunk.0.split_to(len as usize); + if !chunk.0.is_empty() { + payload.unread_data(chunk.0); } Ok(Async::Ready(Some(ch))) }, - Async::Ready(Some(Err(err))) => Err(err.into()) + Err(err) => Err(err.into()) } } } diff --git a/src/payload.rs b/src/payload.rs index 1e7427d3c..ae4c881a6 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -8,10 +8,17 @@ use bytes::{Bytes, BytesMut}; use futures::{Async, Poll, Stream}; use futures::task::{Task, current as current_task}; +use actix::ResponseType; + const MAX_PAYLOAD_SIZE: usize = 65_536; // max buffer size 64k /// Just Bytes object -pub type PayloadItem = Result; +pub struct PayloadItem(pub Bytes); + +impl ResponseType for PayloadItem { + type Item = (); + type Error = (); +} #[derive(Debug)] /// A set of error that can occur during payload parsing. @@ -92,7 +99,7 @@ impl Payload { /// Get first available chunk of data. /// Returns Some(PayloadItem) as chunk, `None` indicates eof. - pub fn readany(&mut self) -> Async> { + pub fn readany(&mut self) -> Poll, PayloadError> { self.inner.borrow_mut().readany() } @@ -128,10 +135,10 @@ impl Payload { impl Stream for Payload { type Item = PayloadItem; - type Error = (); + type Error = PayloadError; - fn poll(&mut self) -> Poll, ()> { - Ok(self.readany()) + fn poll(&mut self) -> Poll, PayloadError> { + self.readany() } } @@ -244,17 +251,17 @@ impl Inner { self.len } - fn readany(&mut self) -> Async> { + fn readany(&mut self) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { self.len -= data.len(); - Async::Ready(Some(Ok(data))) + Ok(Async::Ready(Some(PayloadItem(data)))) } else if self.eof { - Async::Ready(None) + Ok(Async::Ready(None)) } else if let Some(err) = self.err.take() { - Async::Ready(Some(Err(err))) + Err(err) } else { self.task = Some(current_task()); - Async::NotReady + Ok(Async::NotReady) } } @@ -391,7 +398,7 @@ mod tests { assert_eq!(payload.len(), 0); match payload.readany() { - Async::NotReady => (), + Ok(Async::NotReady) => (), _ => panic!("error"), } @@ -406,7 +413,7 @@ mod tests { let (mut sender, mut payload) = Payload::new(false); match payload.readany() { - Async::NotReady => (), + Ok(Async::NotReady) => (), _ => panic!("error"), } @@ -418,7 +425,7 @@ mod tests { assert!(!payload.eof()); match payload.readany() { - Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "data"), + Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"), _ => panic!("error"), } assert!(payload.is_empty()); @@ -426,7 +433,7 @@ mod tests { assert_eq!(payload.len(), 0); match payload.readany() { - Async::Ready(None) => (), + Ok(Async::Ready(None)) => (), _ => panic!("error"), } let res: Result<(), ()> = Ok(()); @@ -440,13 +447,13 @@ mod tests { let (mut sender, mut payload) = Payload::new(false); match payload.readany() { - Async::NotReady => (), + Ok(Async::NotReady) => (), _ => panic!("error"), } sender.set_error(PayloadError::Incomplete); match payload.readany() { - Async::Ready(Some(data)) => assert!(data.is_err()), + Err(_) => (), _ => panic!("error"), } let res: Result<(), ()> = Ok(()); @@ -469,7 +476,7 @@ mod tests { assert_eq!(payload.len(), 10); match payload.readany() { - Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "line1"), + Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "line1"), _ => panic!("error"), } assert!(!payload.is_empty()); @@ -587,7 +594,7 @@ mod tests { assert_eq!(payload.len(), 4); match payload.readany() { - Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "data"), + Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"), _ => panic!("error"), } diff --git a/src/server.rs b/src/server.rs index e1e6a2281..7aede0879 100644 --- a/src/server.rs +++ b/src/server.rs @@ -86,7 +86,7 @@ impl HttpServer { if let Ok(iter) = addr.to_socket_addrs() { for addr in iter { match TcpListener::bind(&addr, Arbiter::handle()) { - Ok(tcp) => addrs.push(tcp), + Ok(tcp) => addrs.push((addr, tcp)), Err(e) => err = Some(e), } } @@ -99,7 +99,8 @@ impl HttpServer { } } else { Ok(HttpServer::create(move |ctx| { - for tcp in addrs { + for (addr, tcp) in addrs { + info!("Starting http server on {}", addr); ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a))); } self diff --git a/src/ws.rs b/src/ws.rs index ba3fe59b2..06bdb29f5 100644 --- a/src/ws.rs +++ b/src/ws.rs @@ -196,19 +196,19 @@ impl Stream for WsStream { if !self.closed { loop { match self.rx.readany() { - Async::Ready(Some(Ok(chunk))) => { - self.buf.extend(chunk) + Ok(Async::Ready(Some(chunk))) => { + self.buf.extend(chunk.0) } - Async::Ready(Some(Err(_))) => { - self.closed = true; - break; - } - Async::Ready(None) => { + Ok(Async::Ready(None)) => { done = true; self.closed = true; break; } - Async::NotReady => break, + Ok(Async::NotReady) => break, + Err(_) => { + self.closed = true; + break; + } } } }