From 3a4b16a6d57458f9071cb22732d07ab7f2d864e2 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 16 Nov 2018 21:30:37 -0800 Subject: [PATCH] use BodyLength for request and response body --- src/body.rs | 36 +++++++++++++++++++----------------- src/client/pipeline.rs | 12 ++++++------ src/h1/client.rs | 27 ++++++++++++++------------- src/h1/codec.rs | 20 +++++++++----------- src/h1/encoder.rs | 39 ++++++++++++++------------------------- src/ws/client/service.rs | 4 ++-- 6 files changed, 64 insertions(+), 74 deletions(-) diff --git a/src/body.rs b/src/body.rs index 1165909e8..6e6239c3e 100644 --- a/src/body.rs +++ b/src/body.rs @@ -12,24 +12,26 @@ pub type BodyStream = Box>; /// Type represent streaming payload pub type PayloadStream = Box>; -/// Different type of bory -pub enum BodyType { +#[derive(Debug)] +/// Different type of body +pub enum BodyLength { None, Zero, Sized(usize), + Sized64(u64), Unsized, } /// Type that provides this trait can be streamed to a peer. pub trait MessageBody { - fn tp(&self) -> BodyType; + fn length(&self) -> BodyLength; fn poll_next(&mut self) -> Poll, Error>; } impl MessageBody for () { - fn tp(&self) -> BodyType { - BodyType::Zero + fn length(&self) -> BodyLength { + BodyLength::Zero } fn poll_next(&mut self) -> Poll, Error> { @@ -271,8 +273,8 @@ impl AsRef<[u8]> for Binary { } impl MessageBody for Bytes { - fn tp(&self) -> BodyType { - BodyType::Sized(self.len()) + fn length(&self) -> BodyLength { + BodyLength::Sized(self.len()) } fn poll_next(&mut self) -> Poll, Error> { @@ -285,8 +287,8 @@ impl MessageBody for Bytes { } impl MessageBody for &'static str { - fn tp(&self) -> BodyType { - BodyType::Sized(self.len()) + fn length(&self) -> BodyLength { + BodyLength::Sized(self.len()) } fn poll_next(&mut self) -> Poll, Error> { @@ -301,8 +303,8 @@ impl MessageBody for &'static str { } impl MessageBody for &'static [u8] { - fn tp(&self) -> BodyType { - BodyType::Sized(self.len()) + fn length(&self) -> BodyLength { + BodyLength::Sized(self.len()) } fn poll_next(&mut self) -> Poll, Error> { @@ -317,8 +319,8 @@ impl MessageBody for &'static [u8] { } impl MessageBody for Vec { - fn tp(&self) -> BodyType { - BodyType::Sized(self.len()) + fn length(&self) -> BodyLength { + BodyLength::Sized(self.len()) } fn poll_next(&mut self) -> Poll, Error> { @@ -334,8 +336,8 @@ impl MessageBody for Vec { } impl MessageBody for String { - fn tp(&self) -> BodyType { - BodyType::Sized(self.len()) + fn length(&self) -> BodyLength { + BodyLength::Sized(self.len()) } fn poll_next(&mut self) -> Poll, Error> { @@ -367,8 +369,8 @@ impl MessageBody for MessageBodyStream where S: Stream, { - fn tp(&self) -> BodyType { - BodyType::Unsized + fn length(&self) -> BodyLength { + BodyLength::Unsized } fn poll_next(&mut self) -> Poll, Error> { diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 17ba93e7c..93b349e93 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -10,7 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use super::error::{ConnectorError, SendRequestError}; use super::response::ClientResponse; use super::{Connect, Connection}; -use body::{BodyType, MessageBody, PayloadStream}; +use body::{BodyLength, MessageBody, PayloadStream}; use error::PayloadError; use h1; use message::RequestHead; @@ -25,7 +25,7 @@ where B: MessageBody, I: Connection, { - let tp = body.tp(); + let len = body.length(); connector // connect to the host @@ -33,10 +33,10 @@ where .from_err() // create Framed and send reqest .map(|io| Framed::new(io, h1::ClientCodec::default())) - .and_then(|framed| framed.send((head, tp).into()).from_err()) + .and_then(|framed| framed.send((head, len).into()).from_err()) // send request body - .and_then(move |framed| match body.tp() { - BodyType::None | BodyType::Zero => Either::A(ok(framed)), + .and_then(move |framed| match body.length() { + BodyLength::None | BodyLength::Zero => Either::A(ok(framed)), _ => Either::B(SendBody::new(body, framed)), }) // read response and init read body @@ -64,7 +64,7 @@ where struct SendBody { body: Option, framed: Option>, - write_buf: VecDeque>, + write_buf: VecDeque>, flushed: bool, } diff --git a/src/h1/client.rs b/src/h1/client.rs index f871cb338..8b367acc3 100644 --- a/src/h1/client.rs +++ b/src/h1/client.rs @@ -5,9 +5,9 @@ use bytes::{BufMut, Bytes, BytesMut}; use tokio_codec::{Decoder, Encoder}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; -use super::encoder::{RequestEncoder, ResponseLength}; +use super::encoder::RequestEncoder; use super::{Message, MessageType}; -use body::{Binary, Body, BodyType}; +use body::{Binary, Body, BodyLength}; use client::ClientResponse; use config::ServiceConfig; use error::{ParseError, PayloadError}; @@ -104,7 +104,7 @@ impl ClientCodec { } /// prepare transfer encoding - pub fn prepare_te(&mut self, head: &mut RequestHead, btype: BodyType) { + pub fn prepare_te(&mut self, head: &mut RequestHead, length: BodyLength) { self.inner.te.update( head, self.inner.flags.contains(Flags::HEAD), @@ -138,7 +138,7 @@ impl ClientCodecInner { fn encode_response( &mut self, msg: RequestHead, - btype: BodyType, + length: BodyLength, buffer: &mut BytesMut, ) -> io::Result<()> { // render message @@ -157,20 +157,21 @@ impl ClientCodecInner { // content length let mut len_is_set = true; - match btype { - BodyType::Sized(len) => { + match length { + BodyLength::Sized(len) => helpers::write_content_length(len, buffer), + BodyLength::Sized64(len) => { buffer.extend_from_slice(b"\r\ncontent-length: "); write!(buffer.writer(), "{}", len)?; buffer.extend_from_slice(b"\r\n"); } - BodyType::Unsized => { + BodyLength::Unsized => { buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") } - BodyType::Zero => { + BodyLength::Zero => { len_is_set = false; buffer.extend_from_slice(b"\r\n") } - BodyType::None => buffer.extend_from_slice(b"\r\n"), + BodyLength::None => buffer.extend_from_slice(b"\r\n"), } let mut has_date = false; @@ -178,9 +179,9 @@ impl ClientCodecInner { for (key, value) in &msg.headers { match *key { TRANSFER_ENCODING => continue, - CONTENT_LENGTH => match btype { - BodyType::None => (), - BodyType::Zero => len_is_set = true, + CONTENT_LENGTH => match length { + BodyLength::None => (), + BodyLength::Zero => len_is_set = true, _ => continue, }, DATE => has_date = true, @@ -263,7 +264,7 @@ impl Decoder for ClientPayloadCodec { } impl Encoder for ClientCodec { - type Item = Message<(RequestHead, BodyType)>; + type Item = Message<(RequestHead, BodyLength)>; type Error = io::Error; fn encode( diff --git a/src/h1/codec.rs b/src/h1/codec.rs index c1c6091de..a6f39242b 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -6,9 +6,9 @@ use bytes::{BufMut, Bytes, BytesMut}; use tokio_codec::{Decoder, Encoder}; use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType}; -use super::encoder::{ResponseEncoder, ResponseLength}; +use super::encoder::ResponseEncoder; use super::{Message, MessageType}; -use body::{Binary, Body}; +use body::{Binary, Body, BodyLength}; use config::ServiceConfig; use error::ParseError; use helpers; @@ -155,22 +155,20 @@ impl Codec { // content length let mut len_is_set = true; match self.te.length { - ResponseLength::Chunked => { + BodyLength::Unsized => { buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n") } - ResponseLength::Zero => { + BodyLength::Zero => { len_is_set = false; buffer.extend_from_slice(b"\r\n") } - ResponseLength::Length(len) => { - helpers::write_content_length(len, buffer) - } - ResponseLength::Length64(len) => { + BodyLength::Sized(len) => helpers::write_content_length(len, buffer), + BodyLength::Sized64(len) => { buffer.extend_from_slice(b"\r\ncontent-length: "); write!(buffer.writer(), "{}", len)?; buffer.extend_from_slice(b"\r\n"); } - ResponseLength::None => buffer.extend_from_slice(b"\r\n"), + BodyLength::None => buffer.extend_from_slice(b"\r\n"), } // write headers @@ -182,8 +180,8 @@ impl Codec { match *key { TRANSFER_ENCODING => continue, CONTENT_LENGTH => match self.te.length { - ResponseLength::None => (), - ResponseLength::Zero => { + BodyLength::None => (), + BodyLength::Zero => { len_is_set = true; } _ => continue, diff --git a/src/h1/encoder.rs b/src/h1/encoder.rs index de45351d1..5cfdd01b9 100644 --- a/src/h1/encoder.rs +++ b/src/h1/encoder.rs @@ -8,28 +8,17 @@ use bytes::{Bytes, BytesMut}; use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH}; use http::{StatusCode, Version}; -use body::{Binary, Body}; +use body::{Binary, Body, BodyLength}; use header::ContentEncoding; use http::Method; use message::RequestHead; use request::Request; use response::Response; -#[derive(Debug)] -pub(crate) enum ResponseLength { - Chunked, - /// Check if headers contains length or write 0 - Zero, - Length(usize), - Length64(u64), - /// Do no set content-length - None, -} - #[derive(Debug)] pub(crate) struct ResponseEncoder { head: bool, - pub length: ResponseLength, + pub length: BodyLength, pub te: TransferEncoding, } @@ -37,7 +26,7 @@ impl Default for ResponseEncoder { fn default() -> Self { ResponseEncoder { head: false, - length: ResponseLength::None, + length: BodyLength::None, te: TransferEncoding::empty(), } } @@ -80,18 +69,18 @@ impl ResponseEncoder { StatusCode::NO_CONTENT | StatusCode::CONTINUE | StatusCode::SWITCHING_PROTOCOLS - | StatusCode::PROCESSING => ResponseLength::None, - _ => ResponseLength::Zero, + | StatusCode::PROCESSING => BodyLength::None, + _ => BodyLength::Zero, }; TransferEncoding::empty() } Body::Binary(_) => { - self.length = ResponseLength::Length(len); + self.length = BodyLength::Sized(len); TransferEncoding::length(len as u64) } Body::Streaming(_) => { if resp.upgrade() { - self.length = ResponseLength::None; + self.length = BodyLength::None; TransferEncoding::eof() } else { self.streaming_encoding(version, resp) @@ -115,10 +104,10 @@ impl ResponseEncoder { Some(true) => { // Enable transfer encoding if version == Version::HTTP_2 { - self.length = ResponseLength::None; + self.length = BodyLength::None; TransferEncoding::eof() } else { - self.length = ResponseLength::Chunked; + self.length = BodyLength::Unsized; TransferEncoding::chunked() } } @@ -145,7 +134,7 @@ impl ResponseEncoder { if !chunked { if let Some(len) = len { - self.length = ResponseLength::Length64(len); + self.length = BodyLength::Sized64(len); TransferEncoding::length(len) } else { TransferEncoding::eof() @@ -154,11 +143,11 @@ impl ResponseEncoder { // Enable transfer encoding match version { Version::HTTP_11 => { - self.length = ResponseLength::Chunked; + self.length = BodyLength::Unsized; TransferEncoding::chunked() } _ => { - self.length = ResponseLength::None; + self.length = BodyLength::None; TransferEncoding::eof() } } @@ -171,7 +160,7 @@ impl ResponseEncoder { #[derive(Debug)] pub(crate) struct RequestEncoder { head: bool, - pub length: ResponseLength, + pub length: BodyLength, pub te: TransferEncoding, } @@ -179,7 +168,7 @@ impl Default for RequestEncoder { fn default() -> Self { RequestEncoder { head: false, - length: ResponseLength::None, + length: BodyLength::None, te: TransferEncoding::empty(), } } diff --git a/src/ws/client/service.rs b/src/ws/client/service.rs index 34a151444..94be59f6e 100644 --- a/src/ws/client/service.rs +++ b/src/ws/client/service.rs @@ -13,7 +13,7 @@ use rand; use sha1::Sha1; use tokio_io::{AsyncRead, AsyncWrite}; -use body::BodyType; +use body::BodyLength; use client::ClientResponse; use h1; use ws::Codec; @@ -141,7 +141,7 @@ where // h1 protocol let framed = Framed::new(io, h1::ClientCodec::default()); framed - .send((request.into_parts().0, BodyType::None).into()) + .send((request.into_parts().0, BodyLength::None).into()) .map_err(ClientError::from) .and_then(|framed| { framed