From ee62814216fb67697f2451803f31f006085c05fc Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 6 Oct 2018 20:31:22 -0700 Subject: [PATCH] split request decoder and payload decoder --- src/h1/codec.rs | 57 ++++--- src/h1/decoder.rs | 389 +++++++++++++++++++--------------------------- 2 files changed, 199 insertions(+), 247 deletions(-) diff --git a/src/h1/codec.rs b/src/h1/codec.rs index a27e6472c..8ab8f2528 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -4,15 +4,14 @@ use std::io::{self, Write}; use bytes::{BufMut, Bytes, BytesMut}; use tokio_codec::{Decoder, Encoder}; -use super::decoder::H1Decoder; -pub use super::decoder::InMessage; +use super::decoder::{PayloadDecoder, PayloadItem, RequestDecoder}; use super::encoder::{ResponseEncoder, ResponseLength}; use body::Body; use error::ParseError; use helpers; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::{Method, Version}; -use request::RequestPool; +use request::{Request, RequestPool}; use response::Response; bitflags! { @@ -34,9 +33,23 @@ pub enum OutMessage { Payload(Bytes), } +/// Incoming http/1 request +#[derive(Debug)] +pub enum InMessage { + /// Request + Message(Request), + /// Request with payload + MessageWithPayload(Request), + /// Payload chunk + Chunk(Bytes), + /// End of payload + Eof, +} + /// HTTP/1 Codec pub struct Codec { - decoder: H1Decoder, + decoder: RequestDecoder, + payload: Option, version: Version, // encoder part @@ -64,7 +77,8 @@ impl Codec { Flags::empty() }; Codec { - decoder: H1Decoder::with_pool(pool), + decoder: RequestDecoder::with_pool(pool), + payload: None, version: Version::HTTP_11, flags, @@ -234,21 +248,28 @@ impl Decoder for Codec { type Error = ParseError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - let res = self.decoder.decode(src); - - match res { - Ok(Some(InMessage::Message(ref req))) - | Ok(Some(InMessage::MessageWithPayload(ref req))) => { - self.flags - .set(Flags::HEAD, req.inner.method == Method::HEAD); - self.version = req.inner.version; - if self.flags.contains(Flags::KEEPALIVE_ENABLED) { - self.flags.set(Flags::KEEPALIVE, req.keep_alive()); - } + if self.payload.is_some() { + Ok(match self.payload.as_mut().unwrap().decode(src)? { + Some(PayloadItem::Chunk(chunk)) => Some(InMessage::Chunk(chunk)), + Some(PayloadItem::Eof) => Some(InMessage::Eof), + None => None, + }) + } else if let Some((req, payload)) = self.decoder.decode(src)? { + self.flags + .set(Flags::HEAD, req.inner.method == Method::HEAD); + self.version = req.inner.version; + if self.flags.contains(Flags::KEEPALIVE_ENABLED) { + self.flags.set(Flags::KEEPALIVE, req.keep_alive()); } - _ => (), + self.payload = payload; + if self.payload.is_some() { + Ok(Some(InMessage::MessageWithPayload(req))) + } else { + Ok(Some(InMessage::Message(req))) + } + } else { + Ok(None) } - res } } diff --git a/src/h1/decoder.rs b/src/h1/decoder.rs index 48776226b..fb29f033c 100644 --- a/src/h1/decoder.rs +++ b/src/h1/decoder.rs @@ -3,6 +3,7 @@ use std::{io, mem}; use bytes::{Bytes, BytesMut}; use futures::{Async, Poll}; use httparse; +use tokio_codec::Decoder; use error::ParseError; use http::header::{HeaderName, HeaderValue}; @@ -13,75 +14,25 @@ use uri::Url; const MAX_BUFFER_SIZE: usize = 131_072; const MAX_HEADERS: usize = 96; -pub(crate) struct H1Decoder { - decoder: Option, - pool: &'static RequestPool, +pub struct RequestDecoder(&'static RequestPool); + +impl RequestDecoder { + pub(crate) fn with_pool(pool: &'static RequestPool) -> RequestDecoder { + RequestDecoder(pool) + } } -/// Incoming http/1 request -#[derive(Debug)] -pub enum InMessage { - /// Request - Message(Request), - /// Request with payload - MessageWithPayload(Request), - /// Payload chunk - Chunk(Bytes), - /// End of payload - Eof, +impl Default for RequestDecoder { + fn default() -> RequestDecoder { + RequestDecoder::with_pool(RequestPool::pool()) + } } -impl H1Decoder { - #[cfg(test)] - pub fn new() -> H1Decoder { - H1Decoder::with_pool(RequestPool::pool()) - } +impl Decoder for RequestDecoder { + type Item = (Request, Option); + type Error = ParseError; - pub fn with_pool(pool: &'static RequestPool) -> H1Decoder { - H1Decoder { - pool, - decoder: None, - } - } - - pub fn decode( - &mut self, src: &mut BytesMut, - ) -> Result, ParseError> { - // read payload - if self.decoder.is_some() { - match self.decoder.as_mut().unwrap().decode(src)? { - Async::Ready(Some(bytes)) => return Ok(Some(InMessage::Chunk(bytes))), - Async::Ready(None) => { - self.decoder.take(); - return Ok(Some(InMessage::Eof)); - } - Async::NotReady => return Ok(None), - } - } - - match self.parse_message(src)? { - Async::Ready((msg, decoder)) => { - self.decoder = decoder; - if self.decoder.is_some() { - Ok(Some(InMessage::MessageWithPayload(msg))) - } else { - Ok(Some(InMessage::Message(msg))) - } - } - Async::NotReady => { - if src.len() >= MAX_BUFFER_SIZE { - error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); - Err(ParseError::TooLarge) - } else { - Ok(None) - } - } - } - } - - fn parse_message( - &self, buf: &mut BytesMut, - ) -> Poll<(Request, Option), ParseError> { + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { // Parse http message let mut has_upgrade = false; let mut chunked = false; @@ -98,7 +49,7 @@ impl H1Decoder { unsafe { mem::uninitialized() }; let mut req = httparse::Request::new(&mut parsed); - match req.parse(buf)? { + match req.parse(src)? { httparse::Status::Complete(len) => { let method = Method::from_bytes(req.method.unwrap().as_bytes()) .map_err(|_| ParseError::Method)?; @@ -108,18 +59,18 @@ impl H1Decoder { } else { Version::HTTP_10 }; - HeaderIndex::record(buf, req.headers, &mut headers); + HeaderIndex::record(src, req.headers, &mut headers); (len, method, path, version, req.headers.len()) } - httparse::Status::Partial => return Ok(Async::NotReady), + httparse::Status::Partial => return Ok(None), } }; - let slice = buf.split_to(len).freeze(); + let slice = src.split_to(len).freeze(); // convert headers - let mut msg = RequestPool::get(self.pool); + let mut msg = RequestPool::get(self.0); { let inner = msg.inner_mut(); inner @@ -198,18 +149,21 @@ impl H1Decoder { // https://tools.ietf.org/html/rfc7230#section-3.3.3 let decoder = if chunked { // Chunked encoding - Some(EncodingDecoder::chunked()) + Some(PayloadDecoder::chunked()) } else if let Some(len) = content_length { // Content-Length - Some(EncodingDecoder::length(len)) + Some(PayloadDecoder::length(len)) } else if has_upgrade || msg.inner.method == Method::CONNECT { // upgrade(websocket) or connect - Some(EncodingDecoder::eof()) + Some(PayloadDecoder::eof()) + } else if src.len() >= MAX_BUFFER_SIZE { + error!("MAX_BUFFER_SIZE unprocessed data reached, closing"); + return Err(ParseError::TooLarge); } else { None }; - Ok(Async::Ready((msg, decoder))) + Ok(Some((msg, decoder))) } } @@ -235,30 +189,37 @@ impl HeaderIndex { } } +#[derive(Debug, Clone)] +/// Http payload item +pub enum PayloadItem { + Chunk(Bytes), + Eof, +} + /// Decoders to handle different Transfer-Encodings. /// /// If a message body does not include a Transfer-Encoding, it *should* /// include a Content-Length header. #[derive(Debug, Clone, PartialEq)] -pub struct EncodingDecoder { +pub struct PayloadDecoder { kind: Kind, } -impl EncodingDecoder { - pub fn length(x: u64) -> EncodingDecoder { - EncodingDecoder { +impl PayloadDecoder { + pub fn length(x: u64) -> PayloadDecoder { + PayloadDecoder { kind: Kind::Length(x), } } - pub fn chunked() -> EncodingDecoder { - EncodingDecoder { + pub fn chunked() -> PayloadDecoder { + PayloadDecoder { kind: Kind::Chunked(ChunkedState::Size, 0), } } - pub fn eof() -> EncodingDecoder { - EncodingDecoder { + pub fn eof() -> PayloadDecoder { + PayloadDecoder { kind: Kind::Eof(false), } } @@ -302,53 +263,59 @@ enum ChunkedState { End, } -impl EncodingDecoder { - pub fn decode(&mut self, body: &mut BytesMut) -> Poll, io::Error> { +impl Decoder for PayloadDecoder { + type Item = PayloadItem; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { match self.kind { Kind::Length(ref mut remaining) => { if *remaining == 0 { - Ok(Async::Ready(None)) + Ok(Some(PayloadItem::Eof)) } else { - if body.is_empty() { - return Ok(Async::NotReady); + if src.is_empty() { + return Ok(None); } - let len = body.len() as u64; + let len = src.len() as u64; let buf; if *remaining > len { - buf = body.take().freeze(); + buf = src.take().freeze(); *remaining -= len; } else { - buf = body.split_to(*remaining as usize).freeze(); + buf = src.split_to(*remaining as usize).freeze(); *remaining = 0; - } + }; trace!("Length read: {}", buf.len()); - Ok(Async::Ready(Some(buf))) + Ok(Some(PayloadItem::Chunk(buf))) } } Kind::Chunked(ref mut state, ref mut size) => { loop { let mut buf = None; // advances the chunked state - *state = try_ready!(state.step(body, size, &mut buf)); + *state = match state.step(src, size, &mut buf)? { + Async::NotReady => return Ok(None), + Async::Ready(state) => state, + }; if *state == ChunkedState::End { trace!("End of chunked stream"); - return Ok(Async::Ready(None)); + return Ok(Some(PayloadItem::Eof)); } if let Some(buf) = buf { - return Ok(Async::Ready(Some(buf))); + return Ok(Some(PayloadItem::Chunk(buf))); } - if body.is_empty() { - return Ok(Async::NotReady); + if src.is_empty() { + return Ok(None); } } } Kind::Eof(ref mut is_eof) => { if *is_eof { - Ok(Async::Ready(None)) - } else if !body.is_empty() { - Ok(Async::Ready(Some(body.take().freeze()))) + Ok(Some(PayloadItem::Eof)) + } else if !src.is_empty() { + Ok(Some(PayloadItem::Chunk(src.take().freeze()))) } else { - Ok(Async::NotReady) + Ok(None) } } } @@ -536,15 +503,18 @@ mod tests { _ => panic!("error"), } } + } + + impl PayloadItem { fn chunk(self) -> Bytes { match self { - InMessage::Chunk(chunk) => chunk, + PayloadItem::Chunk(chunk) => chunk, _ => panic!("error"), } } fn eof(&self) -> bool { match *self { - InMessage::Eof => true, + PayloadItem::Eof => true, _ => false, } } @@ -552,8 +522,8 @@ mod tests { macro_rules! parse_ready { ($e:expr) => {{ - match H1Decoder::new().decode($e) { - Ok(Some(msg)) => msg.message(), + match RequestDecoder::default().decode($e) { + Ok(Some((msg, _))) => msg, Ok(_) => unreachable!("Eof during parsing http request"), Err(err) => unreachable!("Error during parsing http request: {:?}", err), } @@ -562,7 +532,7 @@ mod tests { macro_rules! expect_parse_err { ($e:expr) => {{ - match H1Decoder::new().decode($e) { + match RequestDecoder::default().decode($e) { Err(err) => match err { ParseError::Io(_) => unreachable!("Parse error expected"), _ => (), @@ -641,10 +611,9 @@ mod tests { fn test_parse() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); - let mut reader = H1Decoder::new(); + let mut reader = RequestDecoder::default(); match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); + Ok(Some((req, _))) => { assert_eq!(req.version(), Version::HTTP_11); assert_eq!(*req.method(), Method::GET); assert_eq!(req.path(), "/test"); @@ -657,38 +626,25 @@ mod tests { fn test_parse_partial() { let mut buf = BytesMut::from("PUT /test HTTP/1"); - let mut reader = H1Decoder::new(); - match reader.decode(&mut buf) { - Ok(None) => (), - _ => unreachable!("Error"), - } + let mut reader = RequestDecoder::default(); + assert!(reader.decode(&mut buf).unwrap().is_none()); buf.extend(b".1\r\n\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::PUT); - assert_eq!(req.path(), "/test"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); + assert_eq!(req.version(), Version::HTTP_11); + assert_eq!(*req.method(), Method::PUT); + assert_eq!(req.path(), "/test"); } #[test] fn test_parse_post() { let mut buf = BytesMut::from("POST /test2 HTTP/1.0\r\n\r\n"); - let mut reader = H1Decoder::new(); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_10); - assert_eq!(*req.method(), Method::POST); - assert_eq!(req.path(), "/test2"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let mut reader = RequestDecoder::default(); + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); + assert_eq!(req.version(), Version::HTTP_10); + assert_eq!(*req.method(), Method::POST); + assert_eq!(req.path(), "/test2"); } #[test] @@ -696,20 +652,16 @@ mod tests { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let mut reader = H1Decoder::new(); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"body" - ); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); + assert_eq!(req.version(), Version::HTTP_11); + assert_eq!(*req.method(), Method::GET); + assert_eq!(req.path(), "/test"); + assert_eq!( + pl.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), + b"body" + ); } #[test] @@ -717,45 +669,36 @@ mod tests { let mut buf = BytesMut::from("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let mut reader = H1Decoder::new(); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let mut req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), - b"body" - ); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); + assert_eq!(req.version(), Version::HTTP_11); + assert_eq!(*req.method(), Method::GET); + assert_eq!(req.path(), "/test"); + assert_eq!( + pl.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), + b"body" + ); } #[test] fn test_parse_partial_eof() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let mut reader = H1Decoder::new(); + let mut reader = RequestDecoder::default(); assert!(reader.decode(&mut buf).unwrap().is_none()); buf.extend(b"\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); + assert_eq!(req.version(), Version::HTTP_11); + assert_eq!(*req.method(), Method::GET); + assert_eq!(req.path(), "/test"); } #[test] fn test_headers_split_field() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let mut reader = H1Decoder::new(); + let mut reader = RequestDecoder::default(); assert!{ reader.decode(&mut buf).unwrap().is_none() } buf.extend(b"t"); @@ -765,16 +708,11 @@ mod tests { assert!{ reader.decode(&mut buf).unwrap().is_none() } buf.extend(b"t: value\r\n\r\n"); - match reader.decode(&mut buf) { - Ok(Some(msg)) => { - let req = msg.message(); - assert_eq!(req.version(), Version::HTTP_11); - assert_eq!(*req.method(), Method::GET); - assert_eq!(req.path(), "/test"); - assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); - } - Ok(_) | Err(_) => unreachable!("Error during parsing http request"), - } + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); + assert_eq!(req.version(), Version::HTTP_11); + assert_eq!(*req.method(), Method::GET); + assert_eq!(req.path(), "/test"); + assert_eq!(req.headers().get("test").unwrap().as_bytes(), b"value"); } #[test] @@ -784,9 +722,8 @@ mod tests { Set-Cookie: c1=cookie1\r\n\ Set-Cookie: c2=cookie2\r\n\r\n", ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - let req = msg.message(); + let mut reader = RequestDecoder::default(); + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); let val: Vec<_> = req .headers() @@ -985,14 +922,13 @@ mod tests { upgrade: websocket\r\n\r\n\ some raw data", ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); assert!(!req.keep_alive()); assert!(req.upgrade()); assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), + pl.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), b"some raw data" ); } @@ -1039,22 +975,21 @@ mod tests { "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); assert!(req.chunked().unwrap()); buf.extend(b"4\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), + pl.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), b"data" ); assert_eq!( - reader.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), + pl.decode(&mut buf).unwrap().unwrap().chunk().as_ref(), b"line" ); - assert!(reader.decode(&mut buf).unwrap().unwrap().eof()); + assert!(pl.decode(&mut buf).unwrap().unwrap().eof()); } #[test] @@ -1063,10 +998,9 @@ mod tests { "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); assert!(req.chunked().unwrap()); buf.extend( @@ -1075,19 +1009,17 @@ mod tests { transfer-encoding: chunked\r\n\r\n" .iter(), ); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"data"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"line"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert!(msg.eof()); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req2 = msg.message(); - assert!(req2.chunked().unwrap()); - assert_eq!(*req2.method(), Method::POST); - assert!(req2.chunked().unwrap()); + let (req, _) = reader.decode(&mut buf).unwrap().unwrap(); + assert!(req.chunked().unwrap()); + assert_eq!(*req.method(), Method::POST); + assert!(req.chunked().unwrap()); } #[test] @@ -1097,30 +1029,29 @@ mod tests { transfer-encoding: chunked\r\n\r\n", ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - let req = msg.message(); + let mut reader = RequestDecoder::default(); + let (req, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); assert!(req.chunked().unwrap()); buf.extend(b"4\r\n1111\r\n"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"1111"); buf.extend(b"4\r\ndata\r"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"data"); buf.extend(b"\n4"); - assert!(reader.decode(&mut buf).unwrap().is_none()); + assert!(pl.decode(&mut buf).unwrap().is_none()); buf.extend(b"\r"); - assert!(reader.decode(&mut buf).unwrap().is_none()); + assert!(pl.decode(&mut buf).unwrap().is_none()); buf.extend(b"\n"); - assert!(reader.decode(&mut buf).unwrap().is_none()); + assert!(pl.decode(&mut buf).unwrap().is_none()); buf.extend(b"li"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"li"); //trailers @@ -1128,12 +1059,12 @@ mod tests { //not_ready!(reader.parse(&mut buf, &mut readbuf)); buf.extend(b"ne\r\n0\r\n"); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert_eq!(msg.chunk().as_ref(), b"ne"); - assert!(reader.decode(&mut buf).unwrap().is_none()); + assert!(pl.decode(&mut buf).unwrap().is_none()); buf.extend(b"\r\n"); - assert!(reader.decode(&mut buf).unwrap().unwrap().eof()); + assert!(pl.decode(&mut buf).unwrap().unwrap().eof()); } #[test] @@ -1143,17 +1074,17 @@ mod tests { transfer-encoding: chunked\r\n\r\n"[..], ); - let mut reader = H1Decoder::new(); - let msg = reader.decode(&mut buf).unwrap().unwrap(); - assert!(msg.is_payload()); - assert!(msg.message().chunked().unwrap()); + let mut reader = RequestDecoder::default(); + let (msg, pl) = reader.decode(&mut buf).unwrap().unwrap(); + let mut pl = pl.unwrap(); + assert!(msg.chunked().unwrap()); buf.extend(b"4;test\r\ndata\r\n4\r\nline\r\n0\r\n\r\n"); // test: test\r\n\r\n") - let chunk = reader.decode(&mut buf).unwrap().unwrap().chunk(); + let chunk = pl.decode(&mut buf).unwrap().unwrap().chunk(); assert_eq!(chunk, Bytes::from_static(b"data")); - let chunk = reader.decode(&mut buf).unwrap().unwrap().chunk(); + let chunk = pl.decode(&mut buf).unwrap().unwrap().chunk(); assert_eq!(chunk, Bytes::from_static(b"line")); - let msg = reader.decode(&mut buf).unwrap().unwrap(); + let msg = pl.decode(&mut buf).unwrap().unwrap(); assert!(msg.eof()); } }