From 32a9d9f683fa6b02c8721c45a53c1cad05fcadb5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sun, 22 Oct 2017 19:58:50 -0700 Subject: [PATCH] payload tests --- src/logger.rs | 1 - src/payload.rs | 205 +++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 200 insertions(+), 6 deletions(-) diff --git a/src/logger.rs b/src/logger.rs index c418e4364..3311821a7 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -66,7 +66,6 @@ impl Logger { }; info!("{}", self.format.display_with(&render)); - //println!("{}", self.format.display_with(&render)); } } } diff --git a/src/payload.rs b/src/payload.rs index f8f8ac1f4..4d7bfb7a7 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -1,7 +1,6 @@ -use std::fmt; +use std::{fmt, cmp}; use std::rc::{Rc, Weak}; use std::cell::RefCell; -use std::convert::From; use std::collections::VecDeque; use std::error::Error; use std::io::Error as IoError; @@ -67,8 +66,7 @@ impl Payload { pub(crate) fn new(eof: bool) -> (PayloadSender, Payload) { let shared = Rc::new(RefCell::new(Inner::new(eof))); - (PayloadSender{inner: Rc::downgrade(&shared)}, - Payload{inner: shared}) + (PayloadSender{inner: Rc::downgrade(&shared)}, Payload{inner: shared}) } /// Indicates paused state of the payload. If payload data is not consumed @@ -265,7 +263,8 @@ impl Inner { let mut buf = BytesMut::with_capacity(size); while buf.len() < size { let mut chunk = self.items.pop_front().unwrap(); - let rem = size - buf.len(); + let rem = cmp::min(size - buf.len(), chunk.len()); + self.len -= rem; buf.extend(&chunk.split_to(rem)); if !chunk.is_empty() { self.items.push_front(chunk); @@ -362,3 +361,199 @@ impl Inner { self.items.push_front(data) } } + +#[cfg(test)] +mod test { + use super::*; + use futures::future::{lazy, result}; + use tokio_core::reactor::Core; + + #[test] + fn test_basic() { + Core::new().unwrap().run(lazy(|| { + let (_, mut payload) = Payload::new(false); + + assert!(!payload.eof()); + assert!(payload.is_empty()); + assert_eq!(payload.len(), 0); + + match payload.readany() { + Async::NotReady => (), + _ => panic!("error"), + } + + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_eof() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + match payload.readany() { + Async::NotReady => (), + _ => panic!("error"), + } + + assert!(!payload.eof()); + + sender.feed_data(Bytes::from("data")); + sender.feed_eof(); + + assert!(!payload.eof()); + + match payload.readany() { + Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "data"), + _ => panic!("error"), + } + assert!(payload.is_empty()); + assert!(payload.eof()); + + match payload.readany() { + Async::Ready(None) => (), + _ => panic!("error"), + } + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_err() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + match payload.readany() { + Async::NotReady => (), + _ => panic!("error"), + } + + sender.set_error(PayloadError::Incomplete); + match payload.readany() { + Async::Ready(Some(data)) => assert!(data.is_err()), + _ => panic!("error"), + } + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_readany() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + sender.feed_data(Bytes::from("line1")); + + assert!(!payload.is_empty()); + assert_eq!(payload.len(), 5); + + sender.feed_data(Bytes::from("line2")); + assert!(!payload.is_empty()); + assert_eq!(payload.len(), 10); + + match payload.readany() { + Async::Ready(Some(data)) => assert_eq!(&data.unwrap(), "line1"), + _ => panic!("error"), + } + assert!(!payload.is_empty()); + assert_eq!(payload.len(), 5); + + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_readexactly() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + match payload.readexactly(2) { + Ok(Async::NotReady) => (), + _ => panic!("error"), + } + + sender.feed_data(Bytes::from("line1")); + sender.feed_data(Bytes::from("line2")); + + match payload.readexactly(2) { + Ok(Async::Ready(data)) => assert_eq!(&data, "li"), + _ => panic!("error"), + } + + match payload.readexactly(4) { + Ok(Async::Ready(data)) => assert_eq!(&data, "ne1l"), + _ => panic!("error"), + } + + sender.set_error(PayloadError::Incomplete); + match payload.readexactly(10) { + Err(_) => (), + _ => panic!("error"), + } + + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_readuntil() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + match payload.readuntil(b"ne") { + Ok(Async::NotReady) => (), + _ => panic!("error"), + } + + sender.feed_data(Bytes::from("line1")); + sender.feed_data(Bytes::from("line2")); + + match payload.readuntil(b"ne") { + Ok(Async::Ready(data)) => assert_eq!(&data, "line"), + _ => panic!("error"), + } + + match payload.readuntil(b"2") { + Ok(Async::Ready(data)) => assert_eq!(&data, "1line2"), + _ => panic!("error"), + } + + sender.set_error(PayloadError::Incomplete); + match payload.readuntil(b"b") { + Err(_) => (), + _ => panic!("error"), + } + + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } + + #[test] + fn test_pause() { + Core::new().unwrap().run(lazy(|| { + let (mut sender, mut payload) = Payload::new(false); + + assert!(!payload.paused()); + assert!(!sender.maybe_paused()); + + for _ in 0..MAX_PAYLOAD_SIZE+1 { + sender.feed_data(Bytes::from("1")); + } + assert!(sender.maybe_paused()); + assert!(payload.paused()); + + payload.readexactly(10).unwrap(); + assert!(!sender.maybe_paused()); + assert!(!payload.paused()); + + let res: Result<(), ()> = Ok(()); + result(res) + })).unwrap(); + } +}