1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-22 23:05:56 +01:00

rename PayloadHelper

This commit is contained in:
Nikolay Kim 2018-07-18 10:01:28 +06:00
parent 85672d1379
commit 6b10e1eff6
7 changed files with 71 additions and 54 deletions

View File

@ -244,7 +244,7 @@ pub mod dev {
pub use info::ConnectionInfo;
pub use json::{JsonBody, JsonConfig};
pub use param::{FromParam, Params};
pub use payload::{Payload, PayloadHelper};
pub use payload::{Payload, PayloadBuffer};
pub use resource::Resource;
pub use route::Route;
pub use router::{ResourceDef, ResourceInfo, ResourceType, Router};

View File

@ -13,7 +13,7 @@ use httparse;
use mime;
use error::{MultipartError, ParseError, PayloadError};
use payload::PayloadHelper;
use payload::PayloadBuffer;
const MAX_HEADERS: usize = 32;
@ -97,7 +97,7 @@ where
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart {
boundary,
payload: PayloadRef::new(PayloadHelper::new(stream)),
payload: PayloadRef::new(PayloadBuffer::new(stream)),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))),
@ -133,7 +133,7 @@ impl<S> InnerMultipart<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn read_headers(payload: &mut PayloadHelper<S>) -> Poll<HeaderMap, MultipartError> {
fn read_headers(payload: &mut PayloadBuffer<S>) -> Poll<HeaderMap, MultipartError> {
match payload.read_until(b"\r\n\r\n")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
@ -164,7 +164,7 @@ where
}
fn read_boundary(
payload: &mut PayloadHelper<S>, boundary: &str,
payload: &mut PayloadBuffer<S>, boundary: &str,
) -> Poll<bool, MultipartError> {
// TODO: need to read epilogue
match payload.readline()? {
@ -190,7 +190,7 @@ where
}
fn skip_until_boundary(
payload: &mut PayloadHelper<S>, boundary: &str,
payload: &mut PayloadBuffer<S>, boundary: &str,
) -> Poll<bool, MultipartError> {
let mut eof = false;
loop {
@ -490,7 +490,7 @@ where
/// Reads body part content chunk of the specified size.
/// The body part must has `Content-Length` header with proper value.
fn read_len(
payload: &mut PayloadHelper<S>, size: &mut u64,
payload: &mut PayloadBuffer<S>, size: &mut u64,
) -> Poll<Option<Bytes>, MultipartError> {
if *size == 0 {
Ok(Async::Ready(None))
@ -503,7 +503,7 @@ where
*size -= len;
let ch = chunk.split_to(len as usize);
if !chunk.is_empty() {
payload.unread_data(chunk);
payload.unprocessed(chunk);
}
Ok(Async::Ready(Some(ch)))
}
@ -515,14 +515,14 @@ where
/// Reads content chunk of body part with unknown length.
/// The `Content-Length` header for body part is not necessary.
fn read_stream(
payload: &mut PayloadHelper<S>, boundary: &str,
payload: &mut PayloadBuffer<S>, boundary: &str,
) -> Poll<Option<Bytes>, MultipartError> {
match payload.read_until(b"\r")? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
Async::Ready(Some(mut chunk)) => {
if chunk.len() == 1 {
payload.unread_data(chunk);
payload.unprocessed(chunk);
match payload.read_exact(boundary.len() + 4)? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(None) => Err(MultipartError::Incomplete),
@ -531,12 +531,12 @@ where
&& &chunk[2..4] == b"--"
&& &chunk[4..] == boundary.as_bytes()
{
payload.unread_data(chunk);
payload.unprocessed(chunk);
Ok(Async::Ready(None))
} else {
// \r might be part of data stream
let ch = chunk.split_to(1);
payload.unread_data(chunk);
payload.unprocessed(chunk);
Ok(Async::Ready(Some(ch)))
}
}
@ -544,7 +544,7 @@ where
} else {
let to = chunk.len() - 1;
let ch = chunk.split_to(to);
payload.unread_data(chunk);
payload.unprocessed(chunk);
Ok(Async::Ready(Some(ch)))
}
}
@ -592,27 +592,27 @@ where
}
struct PayloadRef<S> {
payload: Rc<UnsafeCell<PayloadHelper<S>>>,
payload: Rc<UnsafeCell<PayloadBuffer<S>>>,
}
impl<S> PayloadRef<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
fn new(payload: PayloadHelper<S>) -> PayloadRef<S> {
fn new(payload: PayloadBuffer<S>) -> PayloadRef<S> {
PayloadRef {
payload: Rc::new(payload.into()),
}
}
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadHelper<S>>
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<&'a mut PayloadBuffer<S>>
where
'a: 'b,
{
// Unsafe: Invariant is inforced by Safety Safety is used as ref counter,
// only top most ref can have mutable access to payload.
if s.current() {
let payload: &mut PayloadHelper<S> = unsafe { &mut *self.payload.get() };
let payload: &mut PayloadBuffer<S> = unsafe { &mut *self.payload.get() };
Some(payload)
} else {
None

View File

@ -280,18 +280,20 @@ impl Inner {
}
}
pub struct PayloadHelper<S> {
/// Payload buffer
pub struct PayloadBuffer<S> {
len: usize,
items: VecDeque<Bytes>,
stream: S,
}
impl<S> PayloadHelper<S>
impl<S> PayloadBuffer<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create new `PayloadBuffer` instance
pub fn new(stream: S) -> Self {
PayloadHelper {
PayloadBuffer {
len: 0,
items: VecDeque::new(),
stream,
@ -316,6 +318,7 @@ where
})
}
/// Read first available chunk of bytes
#[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
@ -330,6 +333,7 @@ where
}
}
/// Check if buffer contains enough bytes
#[inline]
pub fn can_read(&mut self, size: usize) -> Poll<Option<bool>, PayloadError> {
if size <= self.len {
@ -343,6 +347,7 @@ where
}
}
/// Return reference to the first chunk of data
#[inline]
pub fn get_chunk(&mut self) -> Poll<Option<&[u8]>, PayloadError> {
if self.items.is_empty() {
@ -358,6 +363,7 @@ where
}
}
/// Read exact number of bytes
#[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
@ -392,8 +398,9 @@ where
}
}
/// Remove specified amount if bytes from buffer
#[inline]
pub fn drop_payload(&mut self, size: usize) {
pub fn drop_bytes(&mut self, size: usize) {
if size <= self.len {
self.len -= size;
@ -410,6 +417,7 @@ where
}
}
/// Copy buffered data
pub fn copy(&mut self, size: usize) -> Poll<Option<BytesMut>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
@ -431,6 +439,7 @@ where
}
}
/// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
@ -486,16 +495,18 @@ where
}
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.read_until(b"\n")
}
pub fn unread_data(&mut self, data: Bytes) {
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
#[allow(dead_code)]
/// Get remaining data from the buffer
pub fn remaining(&mut self) -> Bytes {
self.items
.iter_mut()
@ -535,7 +546,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (_, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.len, 0);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
@ -552,7 +563,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.feed_data(Bytes::from("data"));
@ -577,7 +588,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
@ -595,7 +606,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
@ -624,7 +635,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap());
@ -658,7 +669,7 @@ mod tests {
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::new(false);
let mut payload = PayloadHelper::new(payload);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap());

View File

@ -20,7 +20,7 @@ use body::{Binary, Body};
use error::{Error, UrlParseError};
use header::IntoHeaderValue;
use httpmessage::HttpMessage;
use payload::PayloadHelper;
use payload::PayloadBuffer;
use client::{
ClientConnector, ClientRequest, ClientRequestBuilder, HttpResponseParserError,
@ -275,7 +275,7 @@ impl Client {
struct Inner {
tx: UnboundedSender<Bytes>,
rx: PayloadHelper<Box<Pipeline>>,
rx: PayloadBuffer<Box<Pipeline>>,
closed: bool,
}
@ -431,7 +431,7 @@ impl Future for ClientHandshake {
let inner = Inner {
tx: self.tx.take().unwrap(),
rx: PayloadHelper::new(resp.payload()),
rx: PayloadBuffer::new(resp.payload()),
closed: false,
};

View File

@ -6,7 +6,7 @@ use std::fmt;
use body::Binary;
use error::PayloadError;
use payload::PayloadHelper;
use payload::PayloadBuffer;
use ws::mask::apply_mask;
use ws::proto::{CloseCode, CloseReason, OpCode};
@ -48,7 +48,7 @@ impl Frame {
#[cfg_attr(feature = "cargo-clippy", allow(type_complexity))]
fn read_copy_md<S>(
pl: &mut PayloadHelper<S>, server: bool, max_size: usize,
pl: &mut PayloadBuffer<S>, server: bool, max_size: usize,
) -> Poll<Option<(usize, bool, OpCode, usize, Option<u32>)>, ProtocolError>
where
S: Stream<Item = Bytes, Error = PayloadError>,
@ -201,7 +201,7 @@ impl Frame {
/// Parse the input stream into a frame.
pub fn parse<S>(
pl: &mut PayloadHelper<S>, server: bool, max_size: usize,
pl: &mut PayloadBuffer<S>, server: bool, max_size: usize,
) -> Poll<Option<Frame>, ProtocolError>
where
S: Stream<Item = Bytes, Error = PayloadError>,
@ -230,7 +230,7 @@ impl Frame {
}
// remove prefix
pl.drop_payload(idx);
pl.drop_bytes(idx);
// no need for body
if length == 0 {
@ -393,14 +393,14 @@ mod tests {
#[test]
fn test_parse() {
let mut buf = PayloadHelper::new(once(Ok(BytesMut::from(
let mut buf = PayloadBuffer::new(once(Ok(BytesMut::from(
&[0b0000_0001u8, 0b0000_0001u8][..],
).freeze())));
assert!(is_none(&Frame::parse(&mut buf, false, 1024)));
let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0001u8][..]);
buf.extend(b"1");
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
assert!(!frame.finished);
@ -411,7 +411,7 @@ mod tests {
#[test]
fn test_parse_length0() {
let buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0000u8][..]);
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
assert!(!frame.finished);
@ -422,13 +422,13 @@ mod tests {
#[test]
fn test_parse_length2() {
let buf = BytesMut::from(&[0b0000_0001u8, 126u8][..]);
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
assert!(is_none(&Frame::parse(&mut buf, false, 1024)));
let mut buf = BytesMut::from(&[0b0000_0001u8, 126u8][..]);
buf.extend(&[0u8, 4u8][..]);
buf.extend(b"1234");
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
assert!(!frame.finished);
@ -439,13 +439,13 @@ mod tests {
#[test]
fn test_parse_length4() {
let buf = BytesMut::from(&[0b0000_0001u8, 127u8][..]);
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
assert!(is_none(&Frame::parse(&mut buf, false, 1024)));
let mut buf = BytesMut::from(&[0b0000_0001u8, 127u8][..]);
buf.extend(&[0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 4u8][..]);
buf.extend(b"1234");
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
let frame = extract(Frame::parse(&mut buf, false, 1024));
assert!(!frame.finished);
@ -458,7 +458,7 @@ mod tests {
let mut buf = BytesMut::from(&[0b0000_0001u8, 0b1000_0001u8][..]);
buf.extend(b"0001");
buf.extend(b"1");
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
assert!(Frame::parse(&mut buf, false, 1024).is_err());
@ -472,7 +472,7 @@ mod tests {
fn test_parse_frame_no_mask() {
let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0001u8][..]);
buf.extend(&[1u8]);
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
assert!(Frame::parse(&mut buf, true, 1024).is_err());
@ -486,7 +486,7 @@ mod tests {
fn test_parse_frame_max_size() {
let mut buf = BytesMut::from(&[0b0000_0001u8, 0b0000_0010u8][..]);
buf.extend(&[1u8, 1u8]);
let mut buf = PayloadHelper::new(once(Ok(buf.freeze())));
let mut buf = PayloadBuffer::new(once(Ok(buf.freeze())));
assert!(Frame::parse(&mut buf, true, 1).is_err());

View File

@ -52,7 +52,7 @@ use error::{Error, PayloadError, ResponseError};
use httpmessage::HttpMessage;
use httprequest::HttpRequest;
use httpresponse::{ConnectionType, HttpResponse, HttpResponseBuilder};
use payload::PayloadHelper;
use payload::PayloadBuffer;
mod client;
mod context;
@ -252,7 +252,7 @@ pub fn handshake<S>(
/// Maps `Payload` stream into stream of `ws::Message` items
pub struct WsStream<S> {
rx: PayloadHelper<S>,
rx: PayloadBuffer<S>,
closed: bool,
max_size: usize,
}
@ -264,7 +264,7 @@ where
/// Create new websocket frames stream
pub fn new(stream: S) -> WsStream<S> {
WsStream {
rx: PayloadHelper::new(stream),
rx: PayloadBuffer::new(stream),
closed: false,
max_size: 65_536,
}

View File

@ -997,7 +997,9 @@ fn test_resource_middleware_async_chain_with_error() {
#[cfg(feature = "session")]
#[test]
fn test_session_storage_middleware() {
use actix_web::middleware::session::{RequestSession, SessionStorage, CookieSessionBackend};
use actix_web::middleware::session::{
CookieSessionBackend, RequestSession, SessionStorage,
};
const SIMPLE_NAME: &'static str = "simple";
const SIMPLE_PAYLOAD: &'static str = "kantan";
@ -1008,7 +1010,9 @@ fn test_session_storage_middleware() {
let mut srv = test::TestServer::with_factory(move || {
App::new()
.middleware(SessionStorage::new(CookieSessionBackend::signed(&[0; 32]).secure(false)))
.middleware(SessionStorage::new(
CookieSessionBackend::signed(&[0; 32]).secure(false),
))
.resource("/index", move |r| {
r.f(|req| {
let res = req.session().set(COMPLEX_NAME, COMPLEX_PAYLOAD);
@ -1029,9 +1033,10 @@ fn test_session_storage_middleware() {
HttpResponse::Ok()
})
}).resource("/expect_cookie", move |r| {
})
.resource("/expect_cookie", move |r| {
r.f(|req| {
let cookies = req.cookies().expect("To get cookies");
let _cookies = req.cookies().expect("To get cookies");
let value = req.session().get::<String>(SIMPLE_NAME);
assert!(value.is_ok());
@ -1058,7 +1063,8 @@ fn test_session_storage_middleware() {
assert!(set_cookie.is_some());
let set_cookie = set_cookie.unwrap().to_str().expect("Convert to str");
let request = srv.get()
let request = srv
.get()
.uri(srv.url("/expect_cookie"))
.header("cookie", set_cookie.split(';').next().unwrap())
.finish()