use std::marker::PhantomData;
use std::{fmt, mem};
use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream};
use crate::error::Error;
#[derive(Debug, PartialEq, Copy, Clone)]
/// Different type of body
pub enum BodyLength {
None,
Empty,
Sized(usize),
Sized64(u64),
Stream,
}
impl BodyLength {
pub fn is_eof(&self) -> bool {
match self {
BodyLength::None
| BodyLength::Empty
| BodyLength::Sized(0)
| BodyLength::Sized64(0) => true,
_ => false,
}
}
}
/// Type that provides this trait can be streamed to a peer.
pub trait MessageBody {
fn length(&self) -> BodyLength;
fn poll_next(&mut self) -> Poll, Error>;
}
impl MessageBody for () {
fn length(&self) -> BodyLength {
BodyLength::Empty
}
fn poll_next(&mut self) -> Poll , Error> {
Ok(Async::Ready(None))
}
}
impl MessageBody for Box {
fn length(&self) -> BodyLength {
self.as_ref().length()
}
fn poll_next(&mut self) -> Poll, Error> {
self.as_mut().poll_next()
}
}
pub enum ResponseBody {
Body(B),
Other(Body),
}
impl ResponseBody {
pub fn into_body(self) -> ResponseBody {
match self {
ResponseBody::Body(b) => ResponseBody::Other(b),
ResponseBody::Other(b) => ResponseBody::Other(b),
}
}
}
impl ResponseBody {
pub fn take_body(&mut self) -> ResponseBody {
std::mem::replace(self, ResponseBody::Other(Body::None))
}
}
impl ResponseBody {
pub fn as_ref(&self) -> Option<&B> {
if let ResponseBody::Body(ref b) = self {
Some(b)
} else {
None
}
}
}
impl MessageBody for ResponseBody {
fn length(&self) -> BodyLength {
match self {
ResponseBody::Body(ref body) => body.length(),
ResponseBody::Other(ref body) => body.length(),
}
}
fn poll_next(&mut self) -> Poll, Error> {
match self {
ResponseBody::Body(ref mut body) => body.poll_next(),
ResponseBody::Other(ref mut body) => body.poll_next(),
}
}
}
impl Stream for ResponseBody {
type Item = Bytes;
type Error = Error;
fn poll(&mut self) -> Poll, Self::Error> {
self.poll_next()
}
}
/// Represents various types of http message body.
pub enum Body {
/// Empty response. `Content-Length` header is not set.
None,
/// Zero sized response body. `Content-Length` header is set to `0`.
Empty,
/// Specific response body.
Bytes(Bytes),
/// Generic message body.
Message(Box),
}
impl Body {
/// Create body from slice (copy)
pub fn from_slice(s: &[u8]) -> Body {
Body::Bytes(Bytes::from(s))
}
/// Create body from generic message body.
pub fn from_message(body: B) -> Body {
Body::Message(Box::new(body))
}
}
impl MessageBody for Body {
fn length(&self) -> BodyLength {
match self {
Body::None => BodyLength::None,
Body::Empty => BodyLength::Empty,
Body::Bytes(ref bin) => BodyLength::Sized(bin.len()),
Body::Message(ref body) => body.length(),
}
}
fn poll_next(&mut self) -> Poll, Error> {
match self {
Body::None => Ok(Async::Ready(None)),
Body::Empty => Ok(Async::Ready(None)),
Body::Bytes(ref mut bin) => {
let len = bin.len();
if len == 0 {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(bin.split_to(len))))
}
}
Body::Message(ref mut body) => body.poll_next(),
}
}
}
impl PartialEq for Body {
fn eq(&self, other: &Body) -> bool {
match *self {
Body::None => match *other {
Body::None => true,
_ => false,
},
Body::Empty => match *other {
Body::Empty => true,
_ => false,
},
Body::Bytes(ref b) => match *other {
Body::Bytes(ref b2) => b == b2,
_ => false,
},
Body::Message(_) => false,
}
}
}
impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Body::None => write!(f, "Body::None"),
Body::Empty => write!(f, "Body::Zero"),
Body::Bytes(ref b) => write!(f, "Body::Bytes({:?})", b),
Body::Message(_) => write!(f, "Body::Message(_)"),
}
}
}
impl From<&'static str> for Body {
fn from(s: &'static str) -> Body {
Body::Bytes(Bytes::from_static(s.as_ref()))
}
}
impl From<&'static [u8]> for Body {
fn from(s: &'static [u8]) -> Body {
Body::Bytes(Bytes::from_static(s))
}
}
impl From> for Body {
fn from(vec: Vec) -> Body {
Body::Bytes(Bytes::from(vec))
}
}
impl From for Body {
fn from(s: String) -> Body {
s.into_bytes().into()
}
}
impl<'a> From<&'a String> for Body {
fn from(s: &'a String) -> Body {
Body::Bytes(Bytes::from(AsRef::<[u8]>::as_ref(&s)))
}
}
impl From for Body {
fn from(s: Bytes) -> Body {
Body::Bytes(s)
}
}
impl From for Body {
fn from(s: BytesMut) -> Body {
Body::Bytes(s.freeze())
}
}
impl MessageBody for Bytes {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll, Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(mem::replace(self, Bytes::new()))))
}
}
}
impl MessageBody for BytesMut {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll , Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(
mem::replace(self, BytesMut::new()).freeze(),
)))
}
}
}
impl MessageBody for &'static str {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll , Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(Bytes::from_static(
mem::replace(self, "").as_ref(),
))))
}
}
}
impl MessageBody for &'static [u8] {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll , Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(Bytes::from_static(mem::replace(
self, b"",
)))))
}
}
}
impl MessageBody for Vec {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll, Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(Bytes::from(mem::replace(
self,
Vec::new(),
)))))
}
}
}
impl MessageBody for String {
fn length(&self) -> BodyLength {
BodyLength::Sized(self.len())
}
fn poll_next(&mut self) -> Poll , Error> {
if self.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(Bytes::from(
mem::replace(self, String::new()).into_bytes(),
))))
}
}
}
/// Type represent streaming body.
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
pub struct BodyStream {
stream: S,
_t: PhantomData,
}
impl BodyStream
where
S: Stream- ,
E: Into
,
{
pub fn new(stream: S) -> Self {
BodyStream {
stream,
_t: PhantomData,
}
}
}
impl MessageBody for BodyStream
where
S: Stream- ,
E: Into
,
{
fn length(&self) -> BodyLength {
BodyLength::Stream
}
fn poll_next(&mut self) -> Poll, Error> {
self.stream.poll().map_err(|e| e.into())
}
}
/// Type represent streaming body. This body implementation should be used
/// if total size of stream is known. Data get sent as is without using transfer encoding.
pub struct SizedStream {
size: usize,
stream: S,
}
impl SizedStream
where
S: Stream- ,
{
pub fn new(size: usize, stream: S) -> Self {
SizedStream { size, stream }
}
}
impl
MessageBody for SizedStream
where
S: Stream- ,
{
fn length(&self) -> BodyLength {
BodyLength::Sized(self.size)
}
fn poll_next(&mut self) -> Poll
, Error> {
self.stream.poll()
}
}
#[cfg(test)]
mod tests {
use super::*;
impl Body {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
Body::Bytes(ref bin) => &bin,
_ => panic!(),
}
}
}
impl ResponseBody {
pub(crate) fn get_ref(&self) -> &[u8] {
match *self {
ResponseBody::Body(ref b) => b.get_ref(),
ResponseBody::Other(ref b) => b.get_ref(),
}
}
}
#[test]
fn test_static_str() {
assert_eq!(Body::from("").length(), BodyLength::Sized(0));
assert_eq!(Body::from("test").length(), BodyLength::Sized(4));
assert_eq!(Body::from("test").get_ref(), b"test");
}
#[test]
fn test_static_bytes() {
assert_eq!(Body::from(b"test".as_ref()).length(), BodyLength::Sized(4));
assert_eq!(Body::from(b"test".as_ref()).get_ref(), b"test");
assert_eq!(
Body::from_slice(b"test".as_ref()).length(),
BodyLength::Sized(4)
);
assert_eq!(Body::from_slice(b"test".as_ref()).get_ref(), b"test");
}
#[test]
fn test_vec() {
assert_eq!(Body::from(Vec::from("test")).length(), BodyLength::Sized(4));
assert_eq!(Body::from(Vec::from("test")).get_ref(), b"test");
}
#[test]
fn test_bytes() {
assert_eq!(
Body::from(Bytes::from("test")).length(),
BodyLength::Sized(4)
);
assert_eq!(Body::from(Bytes::from("test")).get_ref(), b"test");
}
#[test]
fn test_string() {
let b = "test".to_owned();
assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4));
assert_eq!(Body::from(b.clone()).get_ref(), b"test");
assert_eq!(Body::from(&b).length(), BodyLength::Sized(4));
assert_eq!(Body::from(&b).get_ref(), b"test");
}
#[test]
fn test_bytes_mut() {
let b = BytesMut::from("test");
assert_eq!(Body::from(b.clone()).length(), BodyLength::Sized(4));
assert_eq!(Body::from(b).get_ref(), b"test");
}
}