1
0
mirror of https://github.com/fafhrd91/actix-web synced 2025-06-25 06:39:22 +02:00

add h1::SendResponse future; renamed to MessageBody::size

This commit is contained in:
Nikolay Kim
2019-04-10 12:24:17 -07:00
parent 046b7a1425
commit 9bb40c249f
15 changed files with 308 additions and 104 deletions

View File

@ -30,13 +30,13 @@ impl BodySize {
/// Type that provides this trait can be streamed to a peer.
pub trait MessageBody {
fn length(&self) -> BodySize;
fn size(&self) -> BodySize;
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error>;
}
impl MessageBody for () {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Empty
}
@ -46,8 +46,8 @@ impl MessageBody for () {
}
impl<T: MessageBody> MessageBody for Box<T> {
fn length(&self) -> BodySize {
self.as_ref().length()
fn size(&self) -> BodySize {
self.as_ref().size()
}
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
@ -86,10 +86,10 @@ impl<B: MessageBody> ResponseBody<B> {
}
impl<B: MessageBody> MessageBody for ResponseBody<B> {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
match self {
ResponseBody::Body(ref body) => body.length(),
ResponseBody::Other(ref body) => body.length(),
ResponseBody::Body(ref body) => body.size(),
ResponseBody::Other(ref body) => body.size(),
}
}
@ -135,12 +135,12 @@ impl Body {
}
impl MessageBody for Body {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
match self {
Body::None => BodySize::None,
Body::Empty => BodySize::Empty,
Body::Bytes(ref bin) => BodySize::Sized(bin.len()),
Body::Message(ref body) => body.length(),
Body::Message(ref body) => body.size(),
}
}
@ -185,7 +185,7 @@ impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Body::None => write!(f, "Body::None"),
Body::Empty => write!(f, "Body::Zero"),
Body::Empty => write!(f, "Body::Empty"),
Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b),
Body::Message(_) => write!(f, "Body::Message(_)"),
}
@ -235,7 +235,7 @@ impl From<BytesMut> for Body {
}
impl MessageBody for Bytes {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -249,7 +249,7 @@ impl MessageBody for Bytes {
}
impl MessageBody for BytesMut {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -265,7 +265,7 @@ impl MessageBody for BytesMut {
}
impl MessageBody for &'static str {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -281,7 +281,7 @@ impl MessageBody for &'static str {
}
impl MessageBody for &'static [u8] {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -297,7 +297,7 @@ impl MessageBody for &'static [u8] {
}
impl MessageBody for Vec<u8> {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -314,7 +314,7 @@ impl MessageBody for Vec<u8> {
}
impl MessageBody for String {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.len())
}
@ -354,7 +354,7 @@ where
S: Stream<Item = Bytes, Error = E>,
E: Into<Error>,
{
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Stream
}
@ -383,7 +383,7 @@ impl<S> MessageBody for SizedStream<S>
where
S: Stream<Item = Bytes, Error = Error>,
{
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
BodySize::Sized(self.size)
}
@ -416,47 +416,117 @@ mod tests {
#[test]
fn test_static_str() {
assert_eq!(Body::from("").length(), BodySize::Sized(0));
assert_eq!(Body::from("test").length(), BodySize::Sized(4));
assert_eq!(Body::from("").size(), BodySize::Sized(0));
assert_eq!(Body::from("test").size(), BodySize::Sized(4));
assert_eq!(Body::from("test").get_ref(), b"test");
assert_eq!("test".size(), BodySize::Sized(4));
assert_eq!(
"test".poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_static_bytes() {
assert_eq!(Body::from(b"test".as_ref()).length(), BodySize::Sized(4));
assert_eq!(Body::from(b"test".as_ref()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
assert_eq!(
Body::from_slice(b"test".as_ref()).length(),
Body::from_slice(b"test".as_ref()).size(),
BodySize::Sized(4)
);
assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
assert_eq!((&b"test"[..]).size(), BodySize::Sized(4));
assert_eq!(
(&b"test"[..]).poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_vec() {
assert_eq!(Body::from(Vec::from("test")).length(), BodySize::Sized(4));
assert_eq!(Body::from(Vec::from("test")).size(), BodySize::Sized(4));
assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
assert_eq!(Vec::from("test").size(), BodySize::Sized(4));
assert_eq!(
Vec::from("test").poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_bytes() {
assert_eq!(Body::from(Bytes::from("test")).length(), BodySize::Sized(4));
assert_eq!(Body::from(Bytes::from("test")).get_ref(), b"test");
}
#[test]
fn test_string() {
let b = "test".to_owned();
assert_eq!(Body::from(b.clone()).length(), BodySize::Sized(4));
let mut b = Bytes::from("test");
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(Body::from(&b).length(), BodySize::Sized(4));
assert_eq!(Body::from(&b).get_ref(), b"test");
assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_bytes_mut() {
let b = BytesMut::from("test");
assert_eq!(Body::from(b.clone()).length(), BodySize::Sized(4));
assert_eq!(Body::from(b).get_ref(), b"test");
let mut b = BytesMut::from("test");
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_string() {
let mut b = "test".to_owned();
assert_eq!(Body::from(b.clone()).size(), BodySize::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(Body::from(&b).size(), BodySize::Sized(4));
assert_eq!(Body::from(&b).get_ref(), b"test");
assert_eq!(b.size(), BodySize::Sized(4));
assert_eq!(
b.poll_next().unwrap(),
Async::Ready(Some(Bytes::from("test")))
);
}
#[test]
fn test_unit() {
assert_eq!(().size(), BodySize::Empty);
assert_eq!(().poll_next().unwrap(), Async::Ready(None));
}
#[test]
fn test_box() {
let mut val = Box::new(());
assert_eq!(val.size(), BodySize::Empty);
assert_eq!(val.poll_next().unwrap(), Async::Ready(None));
}
#[test]
fn test_body_eq() {
assert!(Body::None == Body::None);
assert!(Body::None != Body::Empty);
assert!(Body::Empty == Body::Empty);
assert!(Body::Empty != Body::None);
assert!(
Body::Bytes(Bytes::from_static(b"1"))
== Body::Bytes(Bytes::from_static(b"1"))
);
assert!(Body::Bytes(Bytes::from_static(b"1")) != Body::None);
}
#[test]
fn test_body_debug() {
assert!(format!("{:?}", Body::None).contains("Body::None"));
assert!(format!("{:?}", Body::Empty).contains("Body::Empty"));
assert!(format!("{:?}", Body::Bytes(Bytes::from_static(b"1"))).contains("1"));
}
}

View File

@ -55,14 +55,14 @@ where
io: Some(io),
};
let len = body.length();
let len = body.size();
// create Framed and send reqest
Framed::new(io, h1::ClientCodec::default())
.send((head, len).into())
.from_err()
// send request body
.and_then(move |framed| match body.length() {
.and_then(move |framed| match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => {
Either::A(ok(framed))
}

View File

@ -27,9 +27,9 @@ where
T: AsyncRead + AsyncWrite + 'static,
B: MessageBody,
{
trace!("Sending client request: {:?} {:?}", head, body.length());
trace!("Sending client request: {:?} {:?}", head, body.size());
let head_req = head.method == Method::HEAD;
let length = body.length();
let length = body.size();
let eof = match length {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => true,
_ => false,

View File

@ -79,12 +79,12 @@ enum EncoderBody<B> {
}
impl<B: MessageBody> MessageBody for Encoder<B> {
fn length(&self) -> BodySize {
fn size(&self) -> BodySize {
if self.encoder.is_none() {
match self.body {
EncoderBody::Bytes(ref b) => b.length(),
EncoderBody::Stream(ref b) => b.length(),
EncoderBody::BoxedStream(ref b) => b.length(),
EncoderBody::Bytes(ref b) => b.size(),
EncoderBody::Stream(ref b) => b.size(),
EncoderBody::BoxedStream(ref b) => b.size(),
}
} else {
BodySize::Stream

View File

@ -320,7 +320,7 @@ where
body: ResponseBody<B>,
) -> Result<State<S, B, X>, DispatchError> {
self.codec
.encode(Message::Item((message, body.length())), &mut self.write_buf)
.encode(Message::Item((message, body.size())), &mut self.write_buf)
.map_err(|err| {
if let Some(mut payload) = self.payload.take() {
payload.set_error(PayloadError::Incomplete(None));
@ -329,7 +329,7 @@ where
})?;
self.flags.set(Flags::KEEPALIVE, self.codec.keepalive());
match body.length() {
match body.size() {
BodySize::None | BodySize::Empty => Ok(State::None),
_ => Ok(State::SendPayload(body)),
}

View File

@ -10,6 +10,7 @@ mod expect;
mod payload;
mod service;
mod upgrade;
mod utils;
pub use self::client::{ClientCodec, ClientPayloadCodec};
pub use self::codec::Codec;
@ -18,6 +19,7 @@ pub use self::expect::ExpectHandler;
pub use self::payload::Payload;
pub use self::service::{H1Service, H1ServiceHandler, OneRequest};
pub use self::upgrade::UpgradeHandler;
pub use self::utils::SendResponse;
#[derive(Debug)]
/// Codec message

View File

@ -0,0 +1,92 @@
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use futures::{Async, Future, Poll, Sink};
use crate::body::{BodySize, MessageBody, ResponseBody};
use crate::error::Error;
use crate::h1::{Codec, Message};
use crate::response::Response;
/// Send http/1 response
pub struct SendResponse<T, B> {
res: Option<Message<(Response<()>, BodySize)>>,
body: Option<ResponseBody<B>>,
framed: Option<Framed<T, Codec>>,
}
impl<T, B> SendResponse<T, B>
where
B: MessageBody,
{
pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
let (res, body) = response.into_parts();
SendResponse {
res: Some((res, body.size()).into()),
body: Some(body),
framed: Some(framed),
}
}
}
impl<T, B> Future for SendResponse<T, B>
where
T: AsyncRead + AsyncWrite,
B: MessageBody,
{
type Item = Framed<T, Codec>;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let mut body_ready = self.body.is_some();
let framed = self.framed.as_mut().unwrap();
// send body
if self.res.is_none() && self.body.is_some() {
while body_ready && self.body.is_some() && !framed.is_write_buf_full() {
match self.body.as_mut().unwrap().poll_next()? {
Async::Ready(item) => {
// body is done
if item.is_none() {
let _ = self.body.take();
}
framed.force_send(Message::Chunk(item))?;
}
Async::NotReady => body_ready = false,
}
}
}
// flush write buffer
if !framed.is_write_buf_empty() {
match framed.poll_complete()? {
Async::Ready(_) => {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
}
}
Async::NotReady => return Ok(Async::NotReady),
}
}
// send response
if let Some(res) = self.res.take() {
framed.force_send(res)?;
continue;
}
if self.body.is_some() {
if body_ready {
continue;
} else {
return Ok(Async::NotReady);
}
} else {
break;
}
}
Ok(Async::Ready(self.framed.take().unwrap()))
}
}

View File

@ -153,10 +153,10 @@ where
fn prepare_response(
&self,
head: &ResponseHead,
length: &mut BodySize,
size: &mut BodySize,
) -> http::Response<()> {
let mut has_date = false;
let mut skip_len = length != &BodySize::Stream;
let mut skip_len = size != &BodySize::Stream;
let mut res = http::Response::new(());
*res.status_mut() = head.status;
@ -166,14 +166,14 @@ where
match head.status {
http::StatusCode::NO_CONTENT
| http::StatusCode::CONTINUE
| http::StatusCode::PROCESSING => *length = BodySize::None,
| http::StatusCode::PROCESSING => *size = BodySize::None,
http::StatusCode::SWITCHING_PROTOCOLS => {
skip_len = true;
*length = BodySize::Stream;
*size = BodySize::Stream;
}
_ => (),
}
let _ = match length {
let _ = match size {
BodySize::None | BodySize::Stream => None,
BodySize::Empty => res
.headers_mut()
@ -229,16 +229,15 @@ where
let (res, body) = res.into().replace_body(());
let mut send = send.take().unwrap();
let mut length = body.length();
let h2_res = self.prepare_response(res.head(), &mut length);
let mut size = body.size();
let h2_res = self.prepare_response(res.head(), &mut size);
let stream = send
.send_response(h2_res, length.is_eof())
.map_err(|e| {
let stream =
send.send_response(h2_res, size.is_eof()).map_err(|e| {
trace!("Error sending h2 response: {:?}", e);
})?;
if length.is_eof() {
if size.is_eof() {
Ok(Async::Ready(()))
} else {
self.state = ServiceResponseState::SendPayload(stream, body);
@ -251,16 +250,15 @@ where
let (res, body) = res.replace_body(());
let mut send = send.take().unwrap();
let mut length = body.length();
let h2_res = self.prepare_response(res.head(), &mut length);
let mut size = body.size();
let h2_res = self.prepare_response(res.head(), &mut size);
let stream = send
.send_response(h2_res, length.is_eof())
.map_err(|e| {
let stream =
send.send_response(h2_res, size.is_eof()).map_err(|e| {
trace!("Error sending h2 response: {:?}", e);
})?;
if length.is_eof() {
if size.is_eof() {
Ok(Async::Ready(()))
} else {
self.state = ServiceResponseState::SendPayload(

View File

@ -210,6 +210,18 @@ impl<B> Response<B> {
}
}
/// Split response and body
pub fn into_parts(self) -> (Response<()>, ResponseBody<B>) {
(
Response {
head: self.head,
body: ResponseBody::Body(()),
error: self.error,
},
self.body,
)
}
/// Drop request's body
pub fn drop_body(self) -> Response<()> {
Response {
@ -264,7 +276,7 @@ impl<B: MessageBody> fmt::Debug for Response<B> {
for (key, val) in self.head.headers.iter() {
let _ = writeln!(f, " {:?}: {:?}", key, val);
}
let _ = writeln!(f, " body: {:?}", self.body.length());
let _ = writeln!(f, " body: {:?}", self.body.size());
res
}
}