From 6b10e1eff6fb3455a4a56ea07367d18623198283 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 18 Jul 2018 10:01:28 +0600 Subject: [PATCH] rename PayloadHelper --- src/lib.rs | 2 +- src/multipart.rs | 32 ++++++++++++++++---------------- src/payload.rs | 35 +++++++++++++++++++++++------------ src/ws/client.rs | 6 +++--- src/ws/frame.rs | 28 ++++++++++++++-------------- src/ws/mod.rs | 6 +++--- tests/test_middleware.rs | 16 +++++++++++----- 7 files changed, 71 insertions(+), 54 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a740e03e..b33f1186 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -244,7 +244,7 @@ pub mod dev { pub use info::ConnectionInfo; pub use json::{JsonBody, JsonConfig}; pub use param::{FromParam, Params}; - pub use payload::{Payload, PayloadHelper}; + pub use payload::{Payload, PayloadBuffer}; pub use resource::Resource; pub use route::Route; pub use router::{ResourceDef, ResourceInfo, ResourceType, Router}; diff --git a/src/multipart.rs b/src/multipart.rs index 1735085d..d4b6059f 100644 --- a/src/multipart.rs +++ b/src/multipart.rs @@ -13,7 +13,7 @@ use httparse; use mime; use error::{MultipartError, ParseError, PayloadError}; -use payload::PayloadHelper; +use payload::PayloadBuffer; const MAX_HEADERS: usize = 32; @@ -97,7 +97,7 @@ where safety: Safety::new(), inner: Some(Rc::new(RefCell::new(InnerMultipart { boundary, - payload: PayloadRef::new(PayloadHelper::new(stream)), + payload: PayloadRef::new(PayloadBuffer::new(stream)), state: InnerState::FirstBoundary, item: InnerMultipartItem::None, }))), @@ -133,7 +133,7 @@ impl InnerMultipart where S: Stream, { - fn read_headers(payload: &mut PayloadHelper) -> Poll { + fn read_headers(payload: &mut PayloadBuffer) -> Poll { match payload.read_until(b"\r\n\r\n")? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), @@ -164,7 +164,7 @@ where } fn read_boundary( - payload: &mut PayloadHelper, boundary: &str, + payload: &mut PayloadBuffer, boundary: &str, ) -> Poll { // TODO: need to read epilogue match payload.readline()? { @@ -190,7 +190,7 @@ where } fn skip_until_boundary( - payload: &mut PayloadHelper, boundary: &str, + payload: &mut PayloadBuffer, boundary: &str, ) -> Poll { let mut eof = false; loop { @@ -490,7 +490,7 @@ where /// Reads body part content chunk of the specified size. /// The body part must has `Content-Length` header with proper value. fn read_len( - payload: &mut PayloadHelper, size: &mut u64, + payload: &mut PayloadBuffer, size: &mut u64, ) -> Poll, MultipartError> { if *size == 0 { Ok(Async::Ready(None)) @@ -503,7 +503,7 @@ where *size -= len; let ch = chunk.split_to(len as usize); if !chunk.is_empty() { - payload.unread_data(chunk); + payload.unprocessed(chunk); } Ok(Async::Ready(Some(ch))) } @@ -515,14 +515,14 @@ where /// Reads content chunk of body part with unknown length. /// The `Content-Length` header for body part is not necessary. fn read_stream( - payload: &mut PayloadHelper, boundary: &str, + payload: &mut PayloadBuffer, boundary: &str, ) -> Poll, MultipartError> { match payload.read_until(b"\r")? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), Async::Ready(Some(mut chunk)) => { if chunk.len() == 1 { - payload.unread_data(chunk); + payload.unprocessed(chunk); match payload.read_exact(boundary.len() + 4)? { Async::NotReady => Ok(Async::NotReady), Async::Ready(None) => Err(MultipartError::Incomplete), @@ -531,12 +531,12 @@ where && &chunk[2..4] == b"--" && &chunk[4..] == boundary.as_bytes() { - payload.unread_data(chunk); + payload.unprocessed(chunk); Ok(Async::Ready(None)) } else { // \r might be part of data stream let ch = chunk.split_to(1); - payload.unread_data(chunk); + payload.unprocessed(chunk); Ok(Async::Ready(Some(ch))) } } @@ -544,7 +544,7 @@ where } else { let to = chunk.len() - 1; let ch = chunk.split_to(to); - payload.unread_data(chunk); + payload.unprocessed(chunk); Ok(Async::Ready(Some(ch))) } } @@ -592,27 +592,27 @@ where } struct PayloadRef { - payload: Rc>>, + payload: Rc>>, } impl PayloadRef where S: Stream, { - fn new(payload: PayloadHelper) -> PayloadRef { + fn new(payload: PayloadBuffer) -> PayloadRef { PayloadRef { payload: Rc::new(payload.into()), } } - fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadHelper> + fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadBuffer> where 'a: 'b, { // Unsafe: Invariant is inforced by Safety Safety is used as ref counter, // only top most ref can have mutable access to payload. if s.current() { - let payload: &mut PayloadHelper = unsafe { &mut *self.payload.get() }; + let payload: &mut PayloadBuffer = unsafe { &mut *self.payload.get() }; Some(payload) } else { None diff --git a/src/payload.rs b/src/payload.rs index fd4e57af..b20bec65 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -280,18 +280,20 @@ impl Inner { } } -pub struct PayloadHelper { +/// Payload buffer +pub struct PayloadBuffer { len: usize, items: VecDeque, stream: S, } -impl PayloadHelper +impl PayloadBuffer where S: Stream, { + /// Create new `PayloadBuffer` instance pub fn new(stream: S) -> Self { - PayloadHelper { + PayloadBuffer { len: 0, items: VecDeque::new(), stream, @@ -316,6 +318,7 @@ where }) } + /// Read first available chunk of bytes #[inline] pub fn readany(&mut self) -> Poll, PayloadError> { if let Some(data) = self.items.pop_front() { @@ -330,6 +333,7 @@ where } } + /// Check if buffer contains enough bytes #[inline] pub fn can_read(&mut self, size: usize) -> Poll, PayloadError> { if size <= self.len { @@ -343,6 +347,7 @@ where } } + /// Return reference to the first chunk of data #[inline] pub fn get_chunk(&mut self) -> Poll, PayloadError> { if self.items.is_empty() { @@ -358,6 +363,7 @@ where } } + /// Read exact number of bytes #[inline] pub fn read_exact(&mut self, size: usize) -> Poll, PayloadError> { if size <= self.len { @@ -392,8 +398,9 @@ where } } + /// Remove specified amount if bytes from buffer #[inline] - pub fn drop_payload(&mut self, size: usize) { + pub fn drop_bytes(&mut self, size: usize) { if size <= self.len { self.len -= size; @@ -410,6 +417,7 @@ where } } + /// Copy buffered data pub fn copy(&mut self, size: usize) -> Poll, PayloadError> { if size <= self.len { let mut buf = BytesMut::with_capacity(size); @@ -431,6 +439,7 @@ where } } + /// Read until specified ending pub fn read_until(&mut self, line: &[u8]) -> Poll, PayloadError> { let mut idx = 0; let mut num = 0; @@ -486,16 +495,18 @@ where } } + /// Read bytes until new line delimiter pub fn readline(&mut self) -> Poll, PayloadError> { self.read_until(b"\n") } - pub fn unread_data(&mut self, data: Bytes) { + /// Put unprocessed data back to the buffer + pub fn unprocessed(&mut self, data: Bytes) { self.len += data.len(); self.items.push_front(data); } - #[allow(dead_code)] + /// Get remaining data from the buffer pub fn remaining(&mut self) -> Bytes { self.items .iter_mut() @@ -535,7 +546,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (_, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); assert_eq!(payload.len, 0); assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); @@ -552,7 +563,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); sender.feed_data(Bytes::from("data")); @@ -577,7 +588,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); assert_eq!(Async::NotReady, payload.readany().ok().unwrap()); @@ -595,7 +606,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); @@ -624,7 +635,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap()); @@ -658,7 +669,7 @@ mod tests { .unwrap() .block_on(lazy(|| { let (mut sender, payload) = Payload::new(false); - let mut payload = PayloadHelper::new(payload); + let mut payload = PayloadBuffer::new(payload); assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap()); diff --git a/src/ws/client.rs b/src/ws/client.rs index 4295905a..98922047 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -20,7 +20,7 @@ use body::{Binary, Body}; use error::{Error, UrlParseError}; use header::IntoHeaderValue; use httpmessage::HttpMessage; -use payload::PayloadHelper; +use payload::PayloadBuffer; use client::{ ClientConnector, ClientRequest, ClientRequestBuilder, HttpResponseParserError, @@ -275,7 +275,7 @@ impl Client { struct Inner { tx: UnboundedSender, - rx: PayloadHelper>, + rx: PayloadBuffer>, closed: bool, } @@ -431,7 +431,7 @@ impl Future for ClientHandshake { let inner = Inner { tx: self.tx.take().unwrap(), - rx: PayloadHelper::new(resp.payload()), + rx: PayloadBuffer::new(resp.payload()), closed: false, }; diff --git a/src/ws/frame.rs b/src/ws/frame.rs index 70065774..006d322f 100644 --- a/src/ws/frame.rs +++ b/src/ws/frame.rs @@ -6,7 +6,7 @@ use std::fmt; use body::Binary; use error::PayloadError; -use payload::PayloadHelper; +use payload::PayloadBuffer; use ws::mask::apply_mask; use ws::proto::{CloseCode, CloseReason, OpCode}; @@ -48,7 +48,7 @@ impl Frame { #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] fn read_copy_md( - pl: &mut PayloadHelper, server: bool, max_size: usize, + pl: &mut PayloadBuffer, server: bool, max_size: usize, ) -> Poll)>, ProtocolError> where S: Stream, @@ -201,7 +201,7 @@ impl Frame { /// Parse the input stream into a frame. pub fn parse( - pl: &mut PayloadHelper, server: bool, max_size: usize, + pl: &mut PayloadBuffer, server: bool, max_size: usize, ) -> Poll, ProtocolError> where S: Stream, @@ -230,7 +230,7 @@ impl Frame { } // remove prefix - pl.drop_payload(idx); + pl.drop_bytes(idx); // no need for body if length == 0 { @@ -393,14 +393,14 @@ mod tests { #[test] fn test_parse() { - let mut buf = PayloadHelper::new(once(Ok(BytesMut::from( + let mut buf = PayloadBuffer::new(once(Ok(BytesMut::from( &[0b0000_0001u8, 0b0000_0001u8][..], ).freeze()))); assert!(is_none(&Frame::parse(&mut buf, false, 1024))); let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0001u8][..]); buf.extend(b"1"); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); let frame = extract(Frame::parse(&mut buf, false, 1024)); assert!(!frame.finished); @@ -411,7 +411,7 @@ mod tests { #[test] fn test_parse_length0() { let buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0000u8][..]); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); let frame = extract(Frame::parse(&mut buf, false, 1024)); assert!(!frame.finished); @@ -422,13 +422,13 @@ mod tests { #[test] fn test_parse_length2() { let buf = BytesMut::from(&[0b0000_0001u8, 126u8][..]); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); assert!(is_none(&Frame::parse(&mut buf, false, 1024))); let mut buf = BytesMut::from(&[0b0000_0001u8, 126u8][..]); buf.extend(&[0u8, 4u8][..]); buf.extend(b"1234"); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); let frame = extract(Frame::parse(&mut buf, false, 1024)); assert!(!frame.finished); @@ -439,13 +439,13 @@ mod tests { #[test] fn test_parse_length4() { let buf = BytesMut::from(&[0b0000_0001u8, 127u8][..]); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); assert!(is_none(&Frame::parse(&mut buf, false, 1024))); let mut buf = BytesMut::from(&[0b0000_0001u8, 127u8][..]); buf.extend(&[0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 4u8][..]); buf.extend(b"1234"); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); let frame = extract(Frame::parse(&mut buf, false, 1024)); assert!(!frame.finished); @@ -458,7 +458,7 @@ mod tests { let mut buf = BytesMut::from(&[0b0000_0001u8, 0b1000_0001u8][..]); buf.extend(b"0001"); buf.extend(b"1"); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); assert!(Frame::parse(&mut buf, false, 1024).is_err()); @@ -472,7 +472,7 @@ mod tests { fn test_parse_frame_no_mask() { let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0001u8][..]); buf.extend(&[1u8]); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); assert!(Frame::parse(&mut buf, true, 1024).is_err()); @@ -486,7 +486,7 @@ mod tests { fn test_parse_frame_max_size() { let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0010u8][..]); buf.extend(&[1u8, 1u8]); - let mut buf = PayloadHelper::new(once(Ok(buf.freeze()))); + let mut buf = PayloadBuffer::new(once(Ok(buf.freeze()))); assert!(Frame::parse(&mut buf, true, 1).is_err()); diff --git a/src/ws/mod.rs b/src/ws/mod.rs index 05099971..ed44e270 100644 --- a/src/ws/mod.rs +++ b/src/ws/mod.rs @@ -52,7 +52,7 @@ use error::{Error, PayloadError, ResponseError}; use httpmessage::HttpMessage; use httprequest::HttpRequest; use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder}; -use payload::PayloadHelper; +use payload::PayloadBuffer; mod client; mod context; @@ -252,7 +252,7 @@ pub fn handshake( /// Maps `Payload` stream into stream of `ws::Message` items pub struct WsStream { - rx: PayloadHelper, + rx: PayloadBuffer, closed: bool, max_size: usize, } @@ -264,7 +264,7 @@ where /// Create new websocket frames stream pub fn new(stream: S) -> WsStream { WsStream { - rx: PayloadHelper::new(stream), + rx: PayloadBuffer::new(stream), closed: false, max_size: 65_536, } diff --git a/tests/test_middleware.rs b/tests/test_middleware.rs index 9c8ea85d..4fa1c81d 100644 --- a/tests/test_middleware.rs +++ b/tests/test_middleware.rs @@ -997,7 +997,9 @@ fn test_resource_middleware_async_chain_with_error() { #[cfg(feature = "session")] #[test] fn test_session_storage_middleware() { - use actix_web::middleware::session::{RequestSession, SessionStorage, CookieSessionBackend}; + use actix_web::middleware::session::{ + CookieSessionBackend, RequestSession, SessionStorage, + }; const SIMPLE_NAME: &'static str = "simple"; const SIMPLE_PAYLOAD: &'static str = "kantan"; @@ -1008,7 +1010,9 @@ fn test_session_storage_middleware() { let mut srv = test::TestServer::with_factory(move || { App::new() - .middleware(SessionStorage::new(CookieSessionBackend::signed(&[0; 32]).secure(false))) + .middleware(SessionStorage::new( + CookieSessionBackend::signed(&[0; 32]).secure(false), + )) .resource("/index", move |r| { r.f(|req| { let res = req.session().set(COMPLEX_NAME, COMPLEX_PAYLOAD); @@ -1029,9 +1033,10 @@ fn test_session_storage_middleware() { HttpResponse::Ok() }) - }).resource("/expect_cookie", move |r| { + }) + .resource("/expect_cookie", move |r| { r.f(|req| { - let cookies = req.cookies().expect("To get cookies"); + let _cookies = req.cookies().expect("To get cookies"); let value = req.session().get::(SIMPLE_NAME); assert!(value.is_ok()); @@ -1058,7 +1063,8 @@ fn test_session_storage_middleware() { assert!(set_cookie.is_some()); let set_cookie = set_cookie.unwrap().to_str().expect("Convert to str"); - let request = srv.get() + let request = srv + .get() .uri(srv.url("/expect_cookie")) .header("cookie", set_cookie.split(';').next().unwrap()) .finish()