1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

use BodyLength for request and response body

This commit is contained in:
Nikolay Kim 2018-11-16 21:30:37 -08:00
parent aa20e2670d
commit 3a4b16a6d5
6 changed files with 64 additions and 74 deletions

View File

@ -12,24 +12,26 @@ pub type BodyStream = Box<dyn Stream<Item = Bytes, Error = Error>>;
/// Type represent streaming payload
pub type PayloadStream = Box<dyn Stream<Item = Bytes, Error = PayloadError>>;
/// Different type of bory
pub enum BodyType {
#[derive(Debug)]
/// Different type of body
pub enum BodyLength {
None,
Zero,
Sized(usize),
Sized64(u64),
Unsized,
}
/// Type that provides this trait can be streamed to a peer.
pub trait MessageBody {
fn tp(&self) -> BodyType;
fn length(&self) -> BodyLength;
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error>;
}
impl MessageBody for () {
fn tp(&self) -> BodyType {
BodyType::Zero
fn length(&self) -> BodyLength {
BodyLength::Zero
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -271,8 +273,8 @@ impl AsRef<[u8]> for Binary {
}
impl MessageBody for Bytes {
fn tp(&self) -> BodyType {
BodyType::Sized(self.len())
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -285,8 +287,8 @@ impl MessageBody for Bytes {
}
impl MessageBody for &'static str {
fn tp(&self) -> BodyType {
BodyType::Sized(self.len())
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -301,8 +303,8 @@ impl MessageBody for &'static str {
}
impl MessageBody for &'static [u8] {
fn tp(&self) -> BodyType {
BodyType::Sized(self.len())
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -317,8 +319,8 @@ impl MessageBody for &'static [u8] {
}
impl MessageBody for Vec<u8> {
fn tp(&self) -> BodyType {
BodyType::Sized(self.len())
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -334,8 +336,8 @@ impl MessageBody for Vec<u8> {
}
impl MessageBody for String {
fn tp(&self) -> BodyType {
BodyType::Sized(self.len())
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -367,8 +369,8 @@ impl<S> MessageBody for MessageBodyStream<S>
where
S: Stream<Item = Bytes, Error = Error>,
{
fn tp(&self) -> BodyType {
BodyType::Unsized
fn length(&self) -> BodyLength {
BodyLength::Unsized
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {

View File

@ -10,7 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use super::error::{ConnectorError, SendRequestError};
use super::response::ClientResponse;
use super::{Connect, Connection};
use body::{BodyType, MessageBody, PayloadStream};
use body::{BodyLength, MessageBody, PayloadStream};
use error::PayloadError;
use h1;
use message::RequestHead;
@ -25,7 +25,7 @@ where
B: MessageBody,
I: Connection,
{
let tp = body.tp();
let len = body.length();
connector
// connect to the host
@ -33,10 +33,10 @@ where
.from_err()
// create Framed and send reqest
.map(|io| Framed::new(io, h1::ClientCodec::default()))
.and_then(|framed| framed.send((head, tp).into()).from_err())
.and_then(|framed| framed.send((head, len).into()).from_err())
// send request body
.and_then(move |framed| match body.tp() {
BodyType::None | BodyType::Zero => Either::A(ok(framed)),
.and_then(move |framed| match body.length() {
BodyLength::None | BodyLength::Zero => Either::A(ok(framed)),
_ => Either::B(SendBody::new(body, framed)),
})
// read response and init read body
@ -64,7 +64,7 @@ where
struct SendBody<I, B> {
body: Option<B>,
framed: Option<Framed<I, h1::ClientCodec>>,
write_buf: VecDeque<h1::Message<(RequestHead, BodyType)>>,
write_buf: VecDeque<h1::Message<(RequestHead, BodyLength)>>,
flushed: bool,
}

View File

@ -5,9 +5,9 @@ use bytes::{BufMut, Bytes, BytesMut};
use tokio_codec::{Decoder, Encoder};
use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType};
use super::encoder::{RequestEncoder, ResponseLength};
use super::encoder::RequestEncoder;
use super::{Message, MessageType};
use body::{Binary, Body, BodyType};
use body::{Binary, Body, BodyLength};
use client::ClientResponse;
use config::ServiceConfig;
use error::{ParseError, PayloadError};
@ -104,7 +104,7 @@ impl ClientCodec {
}
/// prepare transfer encoding
pub fn prepare_te(&mut self, head: &mut RequestHead, btype: BodyType) {
pub fn prepare_te(&mut self, head: &mut RequestHead, length: BodyLength) {
self.inner.te.update(
head,
self.inner.flags.contains(Flags::HEAD),
@ -138,7 +138,7 @@ impl ClientCodecInner {
fn encode_response(
&mut self,
msg: RequestHead,
btype: BodyType,
length: BodyLength,
buffer: &mut BytesMut,
) -> io::Result<()> {
// render message
@ -157,20 +157,21 @@ impl ClientCodecInner {
// content length
let mut len_is_set = true;
match btype {
BodyType::Sized(len) => {
match length {
BodyLength::Sized(len) => helpers::write_content_length(len, buffer),
BodyLength::Sized64(len) => {
buffer.extend_from_slice(b"\r\ncontent-length: ");
write!(buffer.writer(), "{}", len)?;
buffer.extend_from_slice(b"\r\n");
}
BodyType::Unsized => {
BodyLength::Unsized => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
}
BodyType::Zero => {
BodyLength::Zero => {
len_is_set = false;
buffer.extend_from_slice(b"\r\n")
}
BodyType::None => buffer.extend_from_slice(b"\r\n"),
BodyLength::None => buffer.extend_from_slice(b"\r\n"),
}
let mut has_date = false;
@ -178,9 +179,9 @@ impl ClientCodecInner {
for (key, value) in &msg.headers {
match *key {
TRANSFER_ENCODING => continue,
CONTENT_LENGTH => match btype {
BodyType::None => (),
BodyType::Zero => len_is_set = true,
CONTENT_LENGTH => match length {
BodyLength::None => (),
BodyLength::Zero => len_is_set = true,
_ => continue,
},
DATE => has_date = true,
@ -263,7 +264,7 @@ impl Decoder for ClientPayloadCodec {
}
impl Encoder for ClientCodec {
type Item = Message<(RequestHead, BodyType)>;
type Item = Message<(RequestHead, BodyLength)>;
type Error = io::Error;
fn encode(

View File

@ -6,9 +6,9 @@ use bytes::{BufMut, Bytes, BytesMut};
use tokio_codec::{Decoder, Encoder};
use super::decoder::{MessageDecoder, PayloadDecoder, PayloadItem, PayloadType};
use super::encoder::{ResponseEncoder, ResponseLength};
use super::encoder::ResponseEncoder;
use super::{Message, MessageType};
use body::{Binary, Body};
use body::{Binary, Body, BodyLength};
use config::ServiceConfig;
use error::ParseError;
use helpers;
@ -155,22 +155,20 @@ impl Codec {
// content length
let mut len_is_set = true;
match self.te.length {
ResponseLength::Chunked => {
BodyLength::Unsized => {
buffer.extend_from_slice(b"\r\ntransfer-encoding: chunked\r\n")
}
ResponseLength::Zero => {
BodyLength::Zero => {
len_is_set = false;
buffer.extend_from_slice(b"\r\n")
}
ResponseLength::Length(len) => {
helpers::write_content_length(len, buffer)
}
ResponseLength::Length64(len) => {
BodyLength::Sized(len) => helpers::write_content_length(len, buffer),
BodyLength::Sized64(len) => {
buffer.extend_from_slice(b"\r\ncontent-length: ");
write!(buffer.writer(), "{}", len)?;
buffer.extend_from_slice(b"\r\n");
}
ResponseLength::None => buffer.extend_from_slice(b"\r\n"),
BodyLength::None => buffer.extend_from_slice(b"\r\n"),
}
// write headers
@ -182,8 +180,8 @@ impl Codec {
match *key {
TRANSFER_ENCODING => continue,
CONTENT_LENGTH => match self.te.length {
ResponseLength::None => (),
ResponseLength::Zero => {
BodyLength::None => (),
BodyLength::Zero => {
len_is_set = true;
}
_ => continue,

View File

@ -8,28 +8,17 @@ use bytes::{Bytes, BytesMut};
use http::header::{HeaderValue, ACCEPT_ENCODING, CONTENT_LENGTH};
use http::{StatusCode, Version};
use body::{Binary, Body};
use body::{Binary, Body, BodyLength};
use header::ContentEncoding;
use http::Method;
use message::RequestHead;
use request::Request;
use response::Response;
#[derive(Debug)]
pub(crate) enum ResponseLength {
Chunked,
/// Check if headers contains length or write 0
Zero,
Length(usize),
Length64(u64),
/// Do no set content-length
None,
}
#[derive(Debug)]
pub(crate) struct ResponseEncoder {
head: bool,
pub length: ResponseLength,
pub length: BodyLength,
pub te: TransferEncoding,
}
@ -37,7 +26,7 @@ impl Default for ResponseEncoder {
fn default() -> Self {
ResponseEncoder {
head: false,
length: ResponseLength::None,
length: BodyLength::None,
te: TransferEncoding::empty(),
}
}
@ -80,18 +69,18 @@ impl ResponseEncoder {
StatusCode::NO_CONTENT
| StatusCode::CONTINUE
| StatusCode::SWITCHING_PROTOCOLS
| StatusCode::PROCESSING => ResponseLength::None,
_ => ResponseLength::Zero,
| StatusCode::PROCESSING => BodyLength::None,
_ => BodyLength::Zero,
};
TransferEncoding::empty()
}
Body::Binary(_) => {
self.length = ResponseLength::Length(len);
self.length = BodyLength::Sized(len);
TransferEncoding::length(len as u64)
}
Body::Streaming(_) => {
if resp.upgrade() {
self.length = ResponseLength::None;
self.length = BodyLength::None;
TransferEncoding::eof()
} else {
self.streaming_encoding(version, resp)
@ -115,10 +104,10 @@ impl ResponseEncoder {
Some(true) => {
// Enable transfer encoding
if version == Version::HTTP_2 {
self.length = ResponseLength::None;
self.length = BodyLength::None;
TransferEncoding::eof()
} else {
self.length = ResponseLength::Chunked;
self.length = BodyLength::Unsized;
TransferEncoding::chunked()
}
}
@ -145,7 +134,7 @@ impl ResponseEncoder {
if !chunked {
if let Some(len) = len {
self.length = ResponseLength::Length64(len);
self.length = BodyLength::Sized64(len);
TransferEncoding::length(len)
} else {
TransferEncoding::eof()
@ -154,11 +143,11 @@ impl ResponseEncoder {
// Enable transfer encoding
match version {
Version::HTTP_11 => {
self.length = ResponseLength::Chunked;
self.length = BodyLength::Unsized;
TransferEncoding::chunked()
}
_ => {
self.length = ResponseLength::None;
self.length = BodyLength::None;
TransferEncoding::eof()
}
}
@ -171,7 +160,7 @@ impl ResponseEncoder {
#[derive(Debug)]
pub(crate) struct RequestEncoder {
head: bool,
pub length: ResponseLength,
pub length: BodyLength,
pub te: TransferEncoding,
}
@ -179,7 +168,7 @@ impl Default for RequestEncoder {
fn default() -> Self {
RequestEncoder {
head: false,
length: ResponseLength::None,
length: BodyLength::None,
te: TransferEncoding::empty(),
}
}

View File

@ -13,7 +13,7 @@ use rand;
use sha1::Sha1;
use tokio_io::{AsyncRead, AsyncWrite};
use body::BodyType;
use body::BodyLength;
use client::ClientResponse;
use h1;
use ws::Codec;
@ -141,7 +141,7 @@ where
// h1 protocol
let framed = Framed::new(io, h1::ClientCodec::default());
framed
.send((request.into_parts().0, BodyType::None).into())
.send((request.into_parts().0, BodyLength::None).into())
.map_err(ClientError::from)
.and_then(|framed| {
framed