1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-28 01:32:57 +01:00

remove PayloadBuffer

This commit is contained in:
Nikolay Kim 2019-04-03 03:20:20 -07:00
parent 2a89b995aa
commit f56072954b
3 changed files with 6 additions and 398 deletions

View File

@ -1,5 +1,9 @@
# Changes # Changes
### Deleted
* Removed PayloadBuffer
## [0.1.0-alpha.3] - 2019-04-02 ## [0.1.0-alpha.3] - 2019-04-02
### Added ### Added

View File

@ -12,7 +12,7 @@ mod service;
pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::client::{ClientCodec, ClientPayloadCodec};
pub use self::codec::Codec; pub use self::codec::Codec;
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::payload::{Payload, PayloadBuffer}; pub use self::payload::Payload;
pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest};
#[derive(Debug)] #[derive(Debug)]

View File

@ -1,10 +1,9 @@
//! Payload stream //! Payload stream
use std::cell::RefCell; use std::cell::RefCell;
use std::cmp;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use bytes::{Bytes, BytesMut}; use bytes::Bytes;
use futures::task::current as current_task; use futures::task::current as current_task;
use futures::task::Task; use futures::task::Task;
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
@ -258,407 +257,12 @@ impl Inner {
} }
} }
/// Payload buffer
pub struct PayloadBuffer<S> {
len: usize,
items: VecDeque<Bytes>,
stream: S,
}
impl<S> PayloadBuffer<S>
where
S: Stream<Item = Bytes, Error = PayloadError>,
{
/// Create new `PayloadBuffer` instance
pub fn new(stream: S) -> Self {
PayloadBuffer {
len: 0,
items: VecDeque::new(),
stream,
}
}
/// Get mutable reference to an inner stream.
pub fn get_mut(&mut self) -> &mut S {
&mut self.stream
}
#[inline]
fn poll_stream(&mut self) -> Poll<bool, PayloadError> {
self.stream.poll().map(|res| match res {
Async::Ready(Some(data)) => {
self.len += data.len();
self.items.push_back(data);
Async::Ready(true)
}
Async::Ready(None) => Async::Ready(false),
Async::NotReady => Async::NotReady,
})
}
/// Read first available chunk of bytes
#[inline]
pub fn readany(&mut self) -> Poll<Option<Bytes>, PayloadError> {
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
Ok(Async::Ready(Some(data)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.readany(),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Check if buffer contains enough bytes
#[inline]
pub fn can_read(&mut self, size: usize) -> Poll<Option<bool>, PayloadError> {
if size <= self.len {
Ok(Async::Ready(Some(true)))
} else {
match self.poll_stream()? {
Async::Ready(true) => self.can_read(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Return reference to the first chunk of data
#[inline]
pub fn get_chunk(&mut self) -> Poll<Option<&[u8]>, PayloadError> {
if self.items.is_empty() {
match self.poll_stream()? {
Async::Ready(true) => (),
Async::Ready(false) => return Ok(Async::Ready(None)),
Async::NotReady => return Ok(Async::NotReady),
}
}
match self.items.front().map(|c| c.as_ref()) {
Some(chunk) => Ok(Async::Ready(Some(chunk))),
None => Ok(Async::NotReady),
}
}
/// Read exact number of bytes
#[inline]
pub fn read_exact(&mut self, size: usize) -> Poll<Option<Bytes>, PayloadError> {
if size <= self.len {
self.len -= size;
let mut chunk = self.items.pop_front().unwrap();
if size < chunk.len() {
let buf = chunk.split_to(size);
self.items.push_front(chunk);
Ok(Async::Ready(Some(buf)))
} else if size == chunk.len() {
Ok(Async::Ready(Some(chunk)))
} else {
let mut buf = BytesMut::with_capacity(size);
buf.extend_from_slice(&chunk);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
}
}
Ok(Async::Ready(Some(buf.freeze())))
}
} else {
match self.poll_stream()? {
Async::Ready(true) => self.read_exact(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
}
/// Remove specified amount if bytes from buffer
#[inline]
pub fn drop_bytes(&mut self, size: usize) {
if size <= self.len {
self.len -= size;
let mut len = 0;
while len < size {
let mut chunk = self.items.pop_front().unwrap();
let rem = cmp::min(size - len, chunk.len());
len += rem;
if rem < chunk.len() {
chunk.split_to(rem);
self.items.push_front(chunk);
}
}
}
}
/// Copy buffered data
pub fn copy(&mut self, size: usize) -> Poll<Option<BytesMut>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
for chunk in &self.items {
if buf.len() < size {
let rem = cmp::min(size - buf.len(), chunk.len());
buf.extend_from_slice(&chunk[..rem]);
}
if buf.len() == size {
return Ok(Async::Ready(Some(buf)));
}
}
}
match self.poll_stream()? {
Async::Ready(true) => self.copy(size),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
/// Read until specified ending
pub fn read_until(&mut self, line: &[u8]) -> Poll<Option<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
let mut found = false;
let mut length = 0;
for no in 0..self.items.len() {
{
let chunk = &self.items[no];
for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] {
idx += 1;
if idx == line.len() {
num = no;
offset = pos + 1;
length += pos + 1;
found = true;
break;
}
} else {
idx = 0
}
}
if !found {
length += chunk.len()
}
}
if found {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend_from_slice(&self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend_from_slice(&chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
}
self.len -= length;
return Ok(Async::Ready(Some(buf.freeze())));
}
}
match self.poll_stream()? {
Async::Ready(true) => self.read_until(line),
Async::Ready(false) => Ok(Async::Ready(None)),
Async::NotReady => Ok(Async::NotReady),
}
}
/// Read bytes until new line delimiter
pub fn readline(&mut self) -> Poll<Option<Bytes>, PayloadError> {
self.read_until(b"\n")
}
/// Put unprocessed data back to the buffer
pub fn unprocessed(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_front(data);
}
/// Get remaining data from the buffer
pub fn remaining(&mut self) -> Bytes {
self.items
.iter_mut()
.fold(BytesMut::new(), |mut b, c| {
b.extend_from_slice(c);
b
})
.freeze()
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use actix_rt::Runtime; use actix_rt::Runtime;
use futures::future::{lazy, result}; use futures::future::{lazy, result};
#[test]
fn test_basic() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (_, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(payload.len, 0);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test]
fn test_eof() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.feed_data(Bytes::from("data"));
sender.feed_eof();
assert_eq!(
Async::Ready(Some(Bytes::from("data"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
assert_eq!(Async::Ready(None), payload.readany().ok().unwrap());
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test]
fn test_err() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.readany().ok().unwrap());
sender.set_error(PayloadError::Incomplete(None));
payload.readany().err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test]
fn test_readany() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(
Async::Ready(Some(Bytes::from("line1"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
assert_eq!(
Async::Ready(Some(Bytes::from("line2"))),
payload.readany().ok().unwrap()
);
assert_eq!(payload.len, 0);
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test]
fn test_readexactly() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_exact(2).ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(
Async::Ready(Some(Bytes::from_static(b"li"))),
payload.read_exact(2).ok().unwrap()
);
assert_eq!(payload.len, 3);
assert_eq!(
Async::Ready(Some(Bytes::from_static(b"ne1l"))),
payload.read_exact(4).ok().unwrap()
);
assert_eq!(payload.len, 4);
sender.set_error(PayloadError::Incomplete(None));
payload.read_exact(10).err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test]
fn test_readuntil() {
Runtime::new()
.unwrap()
.block_on(lazy(|| {
let (mut sender, payload) = Payload::create(false);
let mut payload = PayloadBuffer::new(payload);
assert_eq!(Async::NotReady, payload.read_until(b"ne").ok().unwrap());
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
assert_eq!(
Async::Ready(Some(Bytes::from("line"))),
payload.read_until(b"ne").ok().unwrap()
);
assert_eq!(payload.len, 1);
assert_eq!(
Async::Ready(Some(Bytes::from("1line2"))),
payload.read_until(b"2").ok().unwrap()
);
assert_eq!(payload.len, 0);
sender.set_error(PayloadError::Incomplete(None));
payload.read_until(b"b").err().unwrap();
let res: Result<(), ()> = Ok(());
result(res)
}))
.unwrap();
}
#[test] #[test]
fn test_unread_data() { fn test_unread_data() {
Runtime::new() Runtime::new()