use std::cell::{Ref, RefMut}; use std::fmt; use bytes::{Bytes, BytesMut}; use futures::{Future, Poll, Stream}; use actix_http::error::PayloadError; use actix_http::http::header::{CONTENT_LENGTH, SET_COOKIE}; use actix_http::http::{HeaderMap, StatusCode, Version}; use actix_http::{Extensions, Head, HttpMessage, Payload, PayloadStream, ResponseHead}; #[cfg(feature = "cookies")] use actix_http::error::CookieParseError; #[cfg(feature = "cookies")] use cookie::Cookie; /// Client Response pub struct ClientResponse { pub(crate) head: ResponseHead, pub(crate) payload: Payload, } impl HttpMessage for ClientResponse { type Stream = S; fn headers(&self) -> &HeaderMap { &self.head.headers } fn extensions(&self) -> Ref { self.head.extensions() } fn extensions_mut(&self) -> RefMut { self.head.extensions_mut() } fn take_payload(&mut self) -> Payload { std::mem::replace(&mut self.payload, Payload::None) } /// Load request cookies. #[inline] #[cfg(feature = "cookies")] fn cookies(&self) -> Result>>, CookieParseError> { struct Cookies(Vec>); if self.extensions().get::().is_none() { let mut cookies = Vec::new(); for hdr in self.headers().get_all(SET_COOKIE) { let s = std::str::from_utf8(hdr.as_bytes()) .map_err(CookieParseError::from)?; cookies.push(Cookie::parse_encoded(s)?.into_owned()); } self.extensions_mut().insert(Cookies(cookies)); } Ok(Ref::map(self.extensions(), |ext| { &ext.get::().unwrap().0 })) } } impl ClientResponse { /// Create new Request instance pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { ClientResponse { head, payload } } #[inline] pub(crate) fn head(&self) -> &ResponseHead { &self.head } #[inline] pub(crate) fn head_mut(&mut self) -> &mut ResponseHead { &mut self.head } /// Read the Request Version. #[inline] pub fn version(&self) -> Version { self.head().version } /// Get the status from the server. #[inline] pub fn status(&self) -> StatusCode { self.head().status } #[inline] /// Returns Request's headers. pub fn headers(&self) -> &HeaderMap { &self.head().headers } #[inline] /// Returns mutable Request's headers. pub fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.head_mut().headers } /// Checks if a connection should be kept alive. #[inline] pub fn keep_alive(&self) -> bool { self.head().keep_alive() } /// Set a body and return previous body value pub fn map_body(mut self, f: F) -> ClientResponse where F: FnOnce(&mut ResponseHead, Payload) -> Payload, { let payload = f(&mut self.head, self.payload); ClientResponse { payload, head: self.head, } } } impl ClientResponse where S: Stream + 'static, { /// Load http response's body. pub fn body(self) -> MessageBody { MessageBody::new(self) } } impl Stream for ClientResponse where S: Stream, { type Item = Bytes; type Error = PayloadError; fn poll(&mut self) -> Poll, Self::Error> { self.payload.poll() } } impl fmt::Debug for ClientResponse { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; writeln!(f, " headers:")?; for (key, val) in self.headers().iter() { writeln!(f, " {:?}: {:?}", key, val)?; } Ok(()) } } /// Future that resolves to a complete http message body. pub struct MessageBody { limit: usize, length: Option, stream: Option>, err: Option, fut: Option>>, } impl MessageBody where S: Stream + 'static, { /// Create `MessageBody` for request. pub fn new(res: ClientResponse) -> MessageBody { let mut len = None; if let Some(l) = res.headers().get(CONTENT_LENGTH) { if let Ok(s) = l.to_str() { if let Ok(l) = s.parse::() { len = Some(l) } else { return Self::err(PayloadError::UnknownLength); } } else { return Self::err(PayloadError::UnknownLength); } } MessageBody { limit: 262_144, length: len, stream: Some(res), fut: None, err: None, } } /// Change max size of payload. By default max size is 256Kb pub fn limit(mut self, limit: usize) -> Self { self.limit = limit; self } fn err(e: PayloadError) -> Self { MessageBody { stream: None, limit: 262_144, fut: None, err: Some(e), length: None, } } } impl Future for MessageBody where S: Stream + 'static, { type Item = Bytes; type Error = PayloadError; fn poll(&mut self) -> Poll { if let Some(ref mut fut) = self.fut { return fut.poll(); } if let Some(err) = self.err.take() { return Err(err); } if let Some(len) = self.length.take() { if len > self.limit { return Err(PayloadError::Overflow); } } // future let limit = self.limit; self.fut = Some(Box::new( self.stream .take() .expect("Can not be used second time") .from_err() .fold(BytesMut::with_capacity(8192), move |mut body, chunk| { if (body.len() + chunk.len()) > limit { Err(PayloadError::Overflow) } else { body.extend_from_slice(&chunk); Ok(body) } }) .map(|body| body.freeze()), )); self.poll() } }