1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00
actix-extras/src/payload.rs

860 lines
24 KiB
Rust
Raw Normal View History

2017-11-06 10:24:49 +01:00
use std::{io, fmt, cmp};
2017-10-09 05:16:48 +02:00
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::collections::VecDeque;
2017-10-20 01:22:21 +02:00
use std::error::Error;
2017-11-06 10:24:49 +01:00
use std::io::{Read, Write, Error as IoError};
use bytes::{Bytes, BytesMut, BufMut, Writer};
2017-11-04 17:07:44 +01:00
use http2::Error as Http2Error;
2017-10-09 05:16:48 +02:00
use futures::{Async, Poll, Stream};
use futures::task::{Task, current as current_task};
2017-11-06 18:24:19 +01:00
use flate2::read::{GzDecoder};
use flate2::write::{DeflateDecoder};
2017-11-06 10:24:49 +01:00
use brotli2::write::BrotliDecoder;
2017-10-09 05:16:48 +02:00
2017-10-27 08:14:33 +02:00
use actix::ResponseType;
2017-11-06 10:24:49 +01:00
use httpresponse::ContentEncoding;
2017-10-27 08:14:33 +02:00
2017-11-06 10:24:49 +01:00
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 65_536; // max buffer size 64k
2017-10-09 05:16:48 +02:00
2017-10-14 01:33:23 +02:00
/// Just Bytes object
2017-10-27 08:14:33 +02:00
pub struct PayloadItem(pub Bytes);
impl ResponseType for PayloadItem {
type Item = ();
type Error = ();
}
2017-10-14 01:33:23 +02:00
2017-11-06 10:24:49 +01:00
impl fmt::Debug for PayloadItem {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
2017-10-14 01:33:23 +02:00
#[derive(Debug)]
/// A set of error that can occur during payload parsing.
pub enum PayloadError {
/// A payload reached EOF, but is not complete.
Incomplete,
2017-11-06 10:24:49 +01:00
/// Content encoding stream corruption
EncodingCorrupted,
2017-10-14 01:33:23 +02:00
/// Parse error
ParseError(IoError),
2017-11-04 17:07:44 +01:00
/// Http2 error
Http2(Http2Error),
2017-10-14 01:33:23 +02:00
}
2017-10-20 01:22:21 +02:00
impl fmt::Display for PayloadError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
PayloadError::ParseError(ref e) => fmt::Display::fmt(e, f),
ref e => f.write_str(e.description()),
}
}
}
impl Error for PayloadError {
fn description(&self) -> &str {
match *self {
PayloadError::Incomplete => "A payload reached EOF, but is not complete.",
2017-11-06 10:24:49 +01:00
PayloadError::EncodingCorrupted => "Can not decode content-encoding.",
2017-10-20 01:22:21 +02:00
PayloadError::ParseError(ref e) => e.description(),
2017-11-04 17:07:44 +01:00
PayloadError::Http2(ref e) => e.description(),
2017-10-20 01:22:21 +02:00
}
}
fn cause(&self) -> Option<&Error> {
match *self {
PayloadError::ParseError(ref error) => Some(error),
_ => None,
}
}
}
2017-10-14 01:33:23 +02:00
impl From<IoError> for PayloadError {
fn from(err: IoError) -> PayloadError {
PayloadError::ParseError(err)
}
}
2017-10-09 05:16:48 +02:00
/// Stream of byte chunks
///
/// Payload stores chunks in vector. First chunk can be received with `.readany()` method.
2017-10-19 08:43:50 +02:00
#[derive(Debug)]
2017-10-09 05:16:48 +02:00
pub struct Payload {
inner: Rc<RefCell<Inner>>,
}
impl Payload {
pub(crate) fn new(eof: bool) -> (PayloadSender, Payload) {
let shared = Rc::new(RefCell::new(Inner::new(eof)));
2017-10-23 04:58:50 +02:00
(PayloadSender{inner: Rc::downgrade(&shared)}, Payload{inner: shared})
2017-10-09 05:16:48 +02:00
}
/// Indicates EOF of payload
pub fn eof(&self) -> bool {
self.inner.borrow().eof()
}
/// Length of the data in this payload
pub fn len(&self) -> usize {
self.inner.borrow().len()
}
/// Is payload empty
pub fn is_empty(&self) -> bool {
self.inner.borrow().len() == 0
}
2017-10-09 05:55:44 +02:00
/// Get first available chunk of data.
2017-10-19 08:43:50 +02:00
/// Returns Some(PayloadItem) as chunk, `None` indicates eof.
2017-10-27 08:14:33 +02:00
pub fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
2017-10-09 05:16:48 +02:00
self.inner.borrow_mut().readany()
}
2017-10-19 08:43:50 +02:00
/// Get exactly number of bytes
/// Returns Some(PayloadItem) as chunk, `None` indicates eof.
pub fn readexactly(&mut self, size: usize) -> Result<Async<Bytes>, PayloadError> {
self.inner.borrow_mut().readexactly(size)
}
/// Read until `\n`
/// Returns Some(PayloadItem) as line, `None` indicates eof.
pub fn readline(&mut self) -> Result<Async<Bytes>, PayloadError> {
self.inner.borrow_mut().readline()
}
/// Read until match line
/// Returns Some(PayloadItem) as line, `None` indicates eof.
pub fn readuntil(&mut self, line: &[u8]) -> Result<Async<Bytes>, PayloadError> {
self.inner.borrow_mut().readuntil(line)
}
2017-10-14 09:11:12 +02:00
#[doc(hidden)]
pub fn readall(&mut self) -> Option<Bytes> {
self.inner.borrow_mut().readall()
}
2017-10-09 05:16:48 +02:00
/// Put unused data back to payload
2017-10-14 01:33:23 +02:00
pub fn unread_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
self.inner.borrow_mut().unread_data(data);
}
2017-11-04 17:07:44 +01:00
/// Get size of payload buffer
pub fn buffer_size(&self) -> usize {
self.inner.borrow().buffer_size()
}
/// Set size of payload buffer
pub fn set_buffer_size(&self, size: usize) {
self.inner.borrow_mut().set_buffer_size(size)
}
2017-10-09 05:16:48 +02:00
}
impl Stream for Payload {
type Item = PayloadItem;
2017-10-27 08:14:33 +02:00
type Error = PayloadError;
2017-10-09 05:16:48 +02:00
2017-10-27 08:14:33 +02:00
fn poll(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
self.readany()
2017-10-09 05:16:48 +02:00
}
}
2017-11-06 10:24:49 +01:00
pub(crate) trait PayloadWriter {
fn set_error(&mut self, err: PayloadError);
fn feed_eof(&mut self);
fn feed_data(&mut self, data: Bytes);
fn capacity(&self) -> usize;
}
2017-10-09 05:16:48 +02:00
pub(crate) struct PayloadSender {
inner: Weak<RefCell<Inner>>,
}
2017-11-06 10:24:49 +01:00
impl PayloadWriter for PayloadSender {
fn set_error(&mut self, err: PayloadError) {
2017-10-14 01:33:23 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().set_error(err)
}
}
2017-11-06 10:24:49 +01:00
fn feed_eof(&mut self) {
2017-10-09 05:16:48 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_eof()
}
}
2017-11-06 10:24:49 +01:00
fn feed_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().feed_data(data)
}
}
2017-11-06 10:24:49 +01:00
fn capacity(&self) -> usize {
if let Some(shared) = self.inner.upgrade() {
shared.borrow().capacity()
} else {
0
}
}
}
enum Decoder {
2017-11-06 18:24:19 +01:00
Zlib(DeflateDecoder<BytesWriter>),
2017-11-06 10:24:49 +01:00
Gzip(Option<GzDecoder<Wrapper>>),
Br(Rc<RefCell<BytesMut>>, BrotliDecoder<WrapperRc>),
Identity,
}
#[derive(Debug)]
struct Wrapper {
buf: BytesMut
}
impl io::Read for Wrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(buf.len(), self.buf.len());
buf[..len].copy_from_slice(&self.buf[..len]);
self.buf.split_to(len);
Ok(len)
}
}
2017-11-06 18:24:19 +01:00
struct BytesWriter {
buf: BytesMut,
}
impl Default for BytesWriter {
fn default() -> BytesWriter {
BytesWriter{buf: BytesMut::with_capacity(8192)}
}
}
impl io::Write for BytesWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
2017-11-06 10:24:49 +01:00
#[derive(Debug)]
struct WrapperRc {
buf: Rc<RefCell<BytesMut>>,
}
impl io::Write for WrapperRc {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.borrow_mut().extend(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub(crate) struct EncodedPayload {
inner: PayloadSender,
decoder: Decoder,
dst: Writer<BytesMut>,
error: bool,
}
impl EncodedPayload {
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
let dec = match enc {
2017-11-06 18:24:19 +01:00
ContentEncoding::Deflate => Decoder::Zlib(
DeflateDecoder::new(BytesWriter::default())),
2017-11-06 10:24:49 +01:00
ContentEncoding::Gzip => Decoder::Gzip(None),
ContentEncoding::Br => {
let buf = Rc::new(RefCell::new(BytesMut::new()));
let buf2 = Rc::clone(&buf);
Decoder::Br(buf, BrotliDecoder::new(WrapperRc{buf: buf2}))
2017-10-09 05:16:48 +02:00
}
2017-11-06 10:24:49 +01:00
_ => Decoder::Identity,
};
EncodedPayload {
inner: inner,
decoder: dec,
error: false,
dst: BytesMut::new().writer(),
2017-10-09 05:16:48 +02:00
}
}
2017-11-06 10:24:49 +01:00
}
2017-11-04 17:07:44 +01:00
2017-11-06 10:24:49 +01:00
impl PayloadWriter for EncodedPayload {
fn set_error(&mut self, err: PayloadError) {
self.inner.set_error(err)
}
fn feed_eof(&mut self) {
if self.error {
return
}
let err = match self.decoder {
Decoder::Br(ref mut buf, ref mut decoder) => {
match decoder.flush() {
Ok(_) => {
let b = buf.borrow_mut().take().freeze();
if !b.is_empty() {
self.inner.feed_data(b);
}
self.inner.feed_eof();
return
},
Err(err) => Some(err),
}
}
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
self.inner.feed_eof();
return
}
loop {
let len = self.dst.get_ref().len();
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
if len < len_buf * 2 {
self.dst.get_mut().reserve(len_buf * 2 - len);
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
}
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
Ok(n) => {
if n == 0 {
self.inner.feed_eof();
return
} else {
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
}
}
Err(err) => break Some(err)
}
}
}
Decoder::Zlib(ref mut decoder) => {
2017-11-06 18:24:19 +01:00
match decoder.flush() {
Ok(_) => {
let b = decoder.get_mut().buf.take().freeze();
if !b.is_empty() {
self.inner.feed_data(b);
}
2017-11-06 10:24:49 +01:00
self.inner.feed_eof();
return
},
2017-11-06 18:24:19 +01:00
Err(err) => Some(err),
2017-11-06 10:24:49 +01:00
}
},
Decoder::Identity => {
self.inner.feed_eof();
return
}
};
self.error = true;
self.decoder = Decoder::Identity;
if let Some(err) = err {
self.set_error(PayloadError::ParseError(err));
2017-11-04 17:07:44 +01:00
} else {
2017-11-06 10:24:49 +01:00
self.set_error(PayloadError::Incomplete);
}
}
fn feed_data(&mut self, data: Bytes) {
if self.error {
return
}
match self.decoder {
Decoder::Br(ref mut buf, ref mut decoder) => {
match decoder.write(&data) {
Ok(_) => {
let b = buf.borrow_mut().take().freeze();
if !b.is_empty() {
self.inner.feed_data(b);
}
return
},
Err(err) => {
trace!("Error decoding br encoding: {}", err);
},
}
}
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
let mut buf = BytesMut::new();
buf.extend(data);
2017-11-06 18:24:19 +01:00
*decoder = Some(GzDecoder::new(Wrapper{buf: buf}).unwrap());
2017-11-06 10:24:49 +01:00
} else {
decoder.as_mut().unwrap().get_mut().buf.extend(data);
}
loop {
let len_buf = decoder.as_mut().unwrap().get_mut().buf.len();
if len_buf == 0 {
return
}
let len = self.dst.get_ref().len();
if len < len_buf * 2 {
self.dst.get_mut().reserve(len_buf * 2 - len);
unsafe{self.dst.get_mut().set_len(len_buf * 2)};
}
match decoder.as_mut().unwrap().read(&mut self.dst.get_mut()) {
Ok(n) => {
if n == 0 {
return
} else {
self.inner.feed_data(self.dst.get_mut().split_to(n).freeze());
}
}
Err(_) => break
}
}
}
Decoder::Zlib(ref mut decoder) => {
2017-11-06 18:24:19 +01:00
match decoder.write(&data) {
Ok(_) => {
let b = decoder.get_mut().buf.take().freeze();
if !b.is_empty() {
self.inner.feed_data(b);
2017-11-06 10:24:49 +01:00
}
2017-11-06 18:24:19 +01:00
return
},
Err(err) => {
trace!("Error decoding deflate encoding: {}", err);
},
2017-11-06 10:24:49 +01:00
}
}
Decoder::Identity => {
self.inner.feed_data(data);
return
}
};
self.error = true;
self.decoder = Decoder::Identity;
self.set_error(PayloadError::EncodingCorrupted);
}
fn capacity(&self) -> usize {
match self.decoder {
Decoder::Br(ref buf, _) => {
buf.borrow().len() + self.inner.capacity()
}
_ => {
self.inner.capacity()
}
2017-11-04 17:07:44 +01:00
}
}
2017-10-09 05:16:48 +02:00
}
2017-10-19 08:43:50 +02:00
#[derive(Debug)]
2017-10-09 05:16:48 +02:00
struct Inner {
len: usize,
eof: bool,
2017-10-14 01:33:23 +02:00
err: Option<PayloadError>,
2017-10-09 05:16:48 +02:00
task: Option<Task>,
items: VecDeque<Bytes>,
2017-11-04 17:07:44 +01:00
buf_size: usize,
2017-10-09 05:16:48 +02:00
}
impl Inner {
fn new(eof: bool) -> Self {
Inner {
len: 0,
eof: eof,
2017-10-14 01:33:23 +02:00
err: None,
2017-10-09 05:16:48 +02:00
task: None,
items: VecDeque::new(),
2017-11-04 17:07:44 +01:00
buf_size: DEFAULT_BUFFER_SIZE,
2017-10-09 05:16:48 +02:00
}
}
2017-10-14 01:33:23 +02:00
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
if let Some(task) = self.task.take() {
task.notify()
}
}
2017-10-09 05:16:48 +02:00
fn feed_eof(&mut self) {
self.eof = true;
if let Some(task) = self.task.take() {
task.notify()
}
}
fn feed_data(&mut self, data: Bytes) {
self.len += data.len();
self.items.push_back(data);
if let Some(task) = self.task.take() {
task.notify()
}
}
fn eof(&self) -> bool {
2017-10-14 09:11:12 +02:00
self.items.is_empty() && self.eof
2017-10-09 05:16:48 +02:00
}
fn len(&self) -> usize {
self.len
}
2017-10-27 08:14:33 +02:00
fn readany(&mut self) -> Poll<Option<PayloadItem>, PayloadError> {
2017-10-09 05:16:48 +02:00
if let Some(data) = self.items.pop_front() {
self.len -= data.len();
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(Some(PayloadItem(data))))
2017-10-09 05:16:48 +02:00
} else if self.eof {
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(None))
2017-10-14 01:33:23 +02:00
} else if let Some(err) = self.err.take() {
2017-10-27 08:14:33 +02:00
Err(err)
2017-10-09 05:16:48 +02:00
} else {
self.task = Some(current_task());
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady)
2017-10-09 05:16:48 +02:00
}
}
2017-10-19 08:43:50 +02:00
fn readexactly(&mut self, size: usize) -> Result<Async<Bytes>, PayloadError> {
if size <= self.len {
let mut buf = BytesMut::with_capacity(size);
while buf.len() < size {
let mut chunk = self.items.pop_front().unwrap();
2017-10-23 04:58:50 +02:00
let rem = cmp::min(size - buf.len(), chunk.len());
self.len -= rem;
2017-10-19 08:43:50 +02:00
buf.extend(&chunk.split_to(rem));
if !chunk.is_empty() {
self.items.push_front(chunk);
return Ok(Async::Ready(buf.freeze()))
}
}
}
if let Some(err) = self.err.take() {
Err(err)
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
}
}
fn readuntil(&mut self, line: &[u8]) -> Result<Async<Bytes>, PayloadError> {
let mut idx = 0;
let mut num = 0;
let mut offset = 0;
let mut found = false;
let mut length = 0;
for no in 0..self.items.len() {
{
let chunk = &self.items[no];
for (pos, ch) in chunk.iter().enumerate() {
if *ch == line[idx] {
idx += 1;
if idx == line.len() {
num = no;
offset = pos+1;
2017-10-23 05:07:18 +02:00
length += pos+1;
2017-10-19 08:43:50 +02:00
found = true;
break;
}
} else {
idx = 0
}
}
if !found {
length += chunk.len()
}
}
if found {
let mut buf = BytesMut::with_capacity(length);
if num > 0 {
for _ in 0..num {
buf.extend(self.items.pop_front().unwrap());
}
}
if offset > 0 {
let mut chunk = self.items.pop_front().unwrap();
buf.extend(chunk.split_to(offset));
if !chunk.is_empty() {
self.items.push_front(chunk)
}
}
self.len -= length;
return Ok(Async::Ready(buf.freeze()))
}
}
if let Some(err) = self.err.take() {
Err(err)
} else {
self.task = Some(current_task());
Ok(Async::NotReady)
}
}
fn readline(&mut self) -> Result<Async<Bytes>, PayloadError> {
self.readuntil(b"\n")
}
2017-10-14 09:11:12 +02:00
pub fn readall(&mut self) -> Option<Bytes> {
let len = self.items.iter().fold(0, |cur, item| cur + item.len());
if len > 0 {
let mut buf = BytesMut::with_capacity(len);
for item in &self.items {
buf.extend(item);
}
self.items = VecDeque::new();
2017-10-19 08:43:50 +02:00
self.len = 0;
2017-10-14 09:11:12 +02:00
Some(buf.take().freeze())
} else {
None
}
}
2017-11-04 17:07:44 +01:00
fn unread_data(&mut self, data: Bytes) {
2017-10-09 05:16:48 +02:00
self.len += data.len();
self.items.push_front(data)
}
2017-11-04 17:07:44 +01:00
fn capacity(&self) -> usize {
if self.len > self.buf_size {
0
} else {
self.buf_size - self.len
}
}
fn buffer_size(&self) -> usize {
self.buf_size
}
fn set_buffer_size(&mut self, size: usize) {
self.buf_size = size
}
2017-10-09 05:16:48 +02:00
}
2017-10-23 04:58:50 +02:00
#[cfg(test)]
2017-10-23 06:40:41 +02:00
mod tests {
2017-10-23 04:58:50 +02:00
use super::*;
2017-10-23 23:08:11 +02:00
use std::io;
2017-10-23 04:58:50 +02:00
use futures::future::{lazy, result};
use tokio_core::reactor::Core;
2017-10-23 23:08:11 +02:00
#[test]
fn test_error() {
let err: PayloadError = IoError::new(io::ErrorKind::Other, "ParseError").into();
assert_eq!(err.description(), "ParseError");
assert_eq!(err.cause().unwrap().description(), "ParseError");
assert_eq!(format!("{}", err), "ParseError");
let err = PayloadError::Incomplete;
assert_eq!(err.description(), "A payload reached EOF, but is not complete.");
assert_eq!(format!("{}", err), "A payload reached EOF, but is not complete.");
}
2017-10-23 04:58:50 +02:00
#[test]
fn test_basic() {
Core::new().unwrap().run(lazy(|| {
let (_, mut payload) = Payload::new(false);
assert!(!payload.eof());
assert!(payload.is_empty());
assert_eq!(payload.len(), 0);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady) => (),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
#[test]
fn test_eof() {
Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady) => (),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
assert!(!payload.eof());
sender.feed_data(Bytes::from("data"));
sender.feed_eof();
assert!(!payload.eof());
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
assert!(payload.is_empty());
assert!(payload.eof());
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 0);
2017-10-23 04:58:50 +02:00
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(None)) => (),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
#[test]
fn test_err() {
Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::NotReady) => (),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
sender.set_error(PayloadError::Incomplete);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Err(_) => (),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
#[test]
fn test_readany() {
Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false);
sender.feed_data(Bytes::from("line1"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 5);
sender.feed_data(Bytes::from("line2"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 10);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "line1"),
2017-10-23 04:58:50 +02:00
_ => panic!("error"),
}
assert!(!payload.is_empty());
assert_eq!(payload.len(), 5);
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
#[test]
fn test_readexactly() {
Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false);
match payload.readexactly(2) {
Ok(Async::NotReady) => (),
_ => panic!("error"),
}
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 10);
2017-10-23 04:58:50 +02:00
match payload.readexactly(2) {
Ok(Async::Ready(data)) => assert_eq!(&data, "li"),
_ => panic!("error"),
}
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 8);
2017-10-23 04:58:50 +02:00
match payload.readexactly(4) {
Ok(Async::Ready(data)) => assert_eq!(&data, "ne1l"),
_ => panic!("error"),
}
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 4);
2017-10-23 04:58:50 +02:00
sender.set_error(PayloadError::Incomplete);
match payload.readexactly(10) {
Err(_) => (),
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
#[test]
fn test_readuntil() {
Core::new().unwrap().run(lazy(|| {
let (mut sender, mut payload) = Payload::new(false);
match payload.readuntil(b"ne") {
Ok(Async::NotReady) => (),
_ => panic!("error"),
}
sender.feed_data(Bytes::from("line1"));
sender.feed_data(Bytes::from("line2"));
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 10);
2017-10-23 04:58:50 +02:00
match payload.readuntil(b"ne") {
Ok(Async::Ready(data)) => assert_eq!(&data, "line"),
_ => panic!("error"),
}
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 6);
2017-10-23 04:58:50 +02:00
match payload.readuntil(b"2") {
Ok(Async::Ready(data)) => assert_eq!(&data, "1line2"),
_ => panic!("error"),
}
2017-10-23 05:07:18 +02:00
assert_eq!(payload.len(), 0);
2017-10-23 04:58:50 +02:00
sender.set_error(PayloadError::Incomplete);
match payload.readuntil(b"b") {
Err(_) => (),
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
2017-10-23 05:19:20 +02:00
#[test]
fn test_unread_data() {
Core::new().unwrap().run(lazy(|| {
let (_, mut payload) = Payload::new(false);
payload.unread_data(Bytes::from("data"));
assert!(!payload.is_empty());
assert_eq!(payload.len(), 4);
match payload.readany() {
2017-10-27 08:14:33 +02:00
Ok(Async::Ready(Some(data))) => assert_eq!(&data.0, "data"),
2017-10-23 05:19:20 +02:00
_ => panic!("error"),
}
let res: Result<(), ()> = Ok(());
result(res)
})).unwrap();
}
2017-10-23 04:58:50 +02:00
}