use std::{ cell::{Ref, RefCell, RefMut}, fmt, mem, pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, }; use actix_http::{ error::PayloadError, header::HeaderMap, BoxedPayloadStream, Extensions, HttpMessage, Payload, ResponseHead, StatusCode, Version, }; use actix_rt::time::{sleep, Sleep}; use bytes::Bytes; use futures_core::Stream; use pin_project_lite::pin_project; use serde::de::DeserializeOwned; use super::{JsonBody, ResponseBody, ResponseTimeout}; #[cfg(feature = "cookies")] use crate::cookie::{Cookie, ParseError as CookieParseError}; pin_project! { /// Client Response pub struct ClientResponse { pub(crate) head: ResponseHead, #[pin] pub(crate) payload: Payload, pub(crate) timeout: ResponseTimeout, pub(crate) extensions: RefCell, } } impl ClientResponse { /// Create new Request instance pub(crate) fn new(head: ResponseHead, payload: Payload) -> Self { ClientResponse { head, payload, timeout: ResponseTimeout::default(), extensions: RefCell::new(Extensions::new()), } } #[inline] pub(crate) fn head(&self) -> &ResponseHead { &self.head } /// Read the Request Version. #[inline] pub fn version(&self) -> Version { self.head().version } /// Get the status from the server. #[inline] pub fn status(&self) -> StatusCode { self.head().status } #[inline] /// Returns request's headers. pub fn headers(&self) -> &HeaderMap { &self.head().headers } /// Map the current body type to another using a closure. Returns a new response. /// /// Closure receives the response head and the current body type. pub fn map_body(mut self, f: F) -> ClientResponse where F: FnOnce(&mut ResponseHead, Payload) -> Payload, { let payload = f(&mut self.head, self.payload); ClientResponse { payload, head: self.head, timeout: self.timeout, extensions: self.extensions, } } /// Set a timeout duration for [`ClientResponse`](self::ClientResponse). /// /// This duration covers the duration of processing the response body stream /// and would end it as timeout error when deadline met. /// /// Disabled by default. pub fn timeout(self, dur: Duration) -> Self { let timeout = match self.timeout { ResponseTimeout::Disabled(Some(mut timeout)) | ResponseTimeout::Enabled(mut timeout) => match Instant::now().checked_add(dur) { Some(deadline) => { timeout.as_mut().reset(deadline.into()); ResponseTimeout::Enabled(timeout) } None => ResponseTimeout::Enabled(Box::pin(sleep(dur))), }, _ => ResponseTimeout::Enabled(Box::pin(sleep(dur))), }; Self { payload: self.payload, head: self.head, timeout, extensions: self.extensions, } } /// This method does not enable timeout. It's used to pass the boxed `Sleep` from /// `SendClientRequest` and reuse it's heap allocation together with it's slot in /// timer wheel. pub(crate) fn _timeout(mut self, timeout: Option>>) -> Self { self.timeout = ResponseTimeout::Disabled(timeout); self } /// Load request cookies. #[cfg(feature = "cookies")] pub fn cookies(&self) -> Result>>, CookieParseError> { struct Cookies(Vec>); if self.extensions().get::().is_none() { let mut cookies = Vec::new(); for hdr in self.headers().get_all(&actix_http::header::SET_COOKIE) { let s = std::str::from_utf8(hdr.as_bytes()).map_err(CookieParseError::from)?; cookies.push(Cookie::parse_encoded(s)?.into_owned()); } self.extensions_mut().insert(Cookies(cookies)); } Ok(Ref::map(self.extensions(), |ext| { &ext.get::().unwrap().0 })) } /// Return request cookie. #[cfg(feature = "cookies")] pub fn cookie(&self, name: &str) -> Option> { if let Ok(cookies) = self.cookies() { for cookie in cookies.iter() { if cookie.name() == name { return Some(cookie.to_owned()); } } } None } } impl ClientResponse where S: Stream>, { /// Returns a [`Future`] that consumes the body stream and resolves to [`Bytes`]. /// /// # Errors /// `Future` implementation returns error if: /// - content length is greater than [limit](ResponseBody::limit) (default: 2 MiB) /// /// # Examples /// ```no_run /// # use awc::Client; /// # use bytes::Bytes; /// # #[actix_rt::main] /// # async fn async_ctx() -> Result<(), Box> { /// let client = Client::default(); /// let mut res = client.get("https://httpbin.org/robots.txt").send().await?; /// let body: Bytes = res.body().await?; /// # Ok(()) /// # } /// ``` /// /// [`Future`]: std::future::Future pub fn body(&mut self) -> ResponseBody { ResponseBody::new(self) } /// Returns a [`Future`] consumes the body stream, parses JSON, and resolves to a deserialized /// `T` value. /// /// # Errors /// Future returns error if: /// - content type is not `application/json`; /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). /// /// # Examples /// ```no_run /// # use awc::Client; /// # #[actix_rt::main] /// # async fn async_ctx() -> Result<(), Box> { /// let client = Client::default(); /// let mut res = client.get("https://httpbin.org/json").send().await?; /// let val = res.json::().await?; /// assert!(val.is_object()); /// # Ok(()) /// # } /// ``` /// /// [`Future`]: std::future::Future pub fn json(&mut self) -> JsonBody { JsonBody::new(self) } } impl fmt::Debug for ClientResponse { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?; writeln!(f, " headers:")?; for (key, val) in self.headers().iter() { writeln!(f, " {:?}: {:?}", key, val)?; } Ok(()) } } impl HttpMessage for ClientResponse { type Stream = S; fn headers(&self) -> &HeaderMap { &self.head.headers } fn take_payload(&mut self) -> Payload { mem::replace(&mut self.payload, Payload::None) } fn extensions(&self) -> Ref<'_, Extensions> { self.extensions.borrow() } fn extensions_mut(&self) -> RefMut<'_, Extensions> { self.extensions.borrow_mut() } } impl Stream for ClientResponse where S: Stream> + Unpin, { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); this.timeout.poll_timeout(cx)?; this.payload.poll_next(cx) } } #[cfg(test)] mod tests { use static_assertions::assert_impl_all; use super::*; use crate::any_body::AnyBody; assert_impl_all!(ClientResponse: Unpin); assert_impl_all!(ClientResponse<()>: Unpin); assert_impl_all!(ClientResponse: Unpin); }