use std::{ future::Future, net, pin::Pin, rc::Rc, task::{Context, Poll}, time::Duration, }; use actix_http::{ body::{BodyStream, MessageBody}, error::HttpError, header::{self, HeaderMap, HeaderName, TryIntoHeaderValue}, RequestHead, RequestHeadType, }; use actix_rt::time::{sleep, Sleep}; use bytes::Bytes; use derive_more::From; use futures_core::Stream; use serde::Serialize; #[cfg(feature = "__compress")] use actix_http::{encoding::Decoder, header::ContentEncoding, Payload}; use crate::{ any_body::AnyBody, client::ClientConfig, error::{FreezeRequestError, InvalidUrl, SendRequestError}, BoxError, ClientResponse, ConnectRequest, ConnectResponse, }; #[derive(Debug, From)] pub(crate) enum PrepForSendingError { Url(InvalidUrl), Http(HttpError), Json(serde_json::Error), Form(serde_urlencoded::ser::Error), } impl From for FreezeRequestError { fn from(err: PrepForSendingError) -> FreezeRequestError { match err { PrepForSendingError::Url(err) => FreezeRequestError::Url(err), PrepForSendingError::Http(err) => FreezeRequestError::Http(err), PrepForSendingError::Json(err) => { FreezeRequestError::Custom(Box::new(err), Box::new("json serialization error")) } PrepForSendingError::Form(err) => { FreezeRequestError::Custom(Box::new(err), Box::new("form serialization error")) } } } } impl From for SendRequestError { fn from(err: PrepForSendingError) -> SendRequestError { match err { PrepForSendingError::Url(e) => SendRequestError::Url(e), PrepForSendingError::Http(e) => SendRequestError::Http(e), PrepForSendingError::Json(err) => { SendRequestError::Custom(Box::new(err), Box::new("json serialization error")) } PrepForSendingError::Form(err) => { SendRequestError::Custom(Box::new(err), Box::new("form serialization error")) } } } } /// Future that sends request's payload and resolves to a server response. #[must_use = "futures do nothing unless polled"] pub enum SendClientRequest { Fut( Pin>>>, // FIXME: use a pinned Sleep instead of box. Option>>, bool, ), Err(Option), } impl SendClientRequest { pub(crate) fn new( send: Pin>>>, response_decompress: bool, timeout: Option, ) -> SendClientRequest { let delay = timeout.map(|d| Box::pin(sleep(d))); SendClientRequest::Fut(send, delay, response_decompress) } } #[cfg(feature = "__compress")] impl Future for SendClientRequest { type Output = Result>, SendRequestError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { SendClientRequest::Fut(send, delay, response_decompress) => { if let Some(delay) = delay { if delay.as_mut().poll(cx).is_ready() { return Poll::Ready(Err(SendRequestError::Timeout)); } } let res = futures_core::ready!(send.as_mut().poll(cx)).map(|res| { res.into_client_response()._timeout(delay.take()).map_body( |head, payload| { if *response_decompress { Payload::Stream { payload: Decoder::from_headers(payload, &head.headers), } } else { Payload::Stream { payload: Decoder::new(payload, ContentEncoding::Identity), } } }, ) }); Poll::Ready(res) } SendClientRequest::Err(ref mut e) => match e.take() { Some(e) => Poll::Ready(Err(e)), None => panic!("Attempting to call completed future"), }, } } } #[cfg(not(feature = "__compress"))] impl Future for SendClientRequest { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { SendClientRequest::Fut(send, delay, _) => { if let Some(delay) = delay { if delay.as_mut().poll(cx).is_ready() { return Poll::Ready(Err(SendRequestError::Timeout)); } } send.as_mut() .poll(cx) .map_ok(|res| res.into_client_response()._timeout(delay.take())) } SendClientRequest::Err(ref mut e) => match e.take() { Some(e) => Poll::Ready(Err(e)), None => panic!("Attempting to call completed future"), }, } } } impl From for SendClientRequest { fn from(e: SendRequestError) -> Self { SendClientRequest::Err(Some(e)) } } impl From for SendClientRequest { fn from(e: HttpError) -> Self { SendClientRequest::Err(Some(e.into())) } } impl From for SendClientRequest { fn from(e: PrepForSendingError) -> Self { SendClientRequest::Err(Some(e.into())) } } #[derive(Debug)] pub(crate) enum RequestSender { Owned(RequestHead), Rc(Rc, Option), } impl RequestSender { pub(crate) fn send_body( self, addr: Option, response_decompress: bool, timeout: Option, config: &ClientConfig, body: B, ) -> SendClientRequest where B: MessageBody + 'static, { let req = match self { RequestSender::Owned(head) => ConnectRequest::Client( RequestHeadType::Owned(head), AnyBody::from_message_body(body).into_boxed(), addr, ), RequestSender::Rc(head, extra_headers) => ConnectRequest::Client( RequestHeadType::Rc(head, extra_headers), AnyBody::from_message_body(body).into_boxed(), addr, ), }; let fut = config.connector.call(req); SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout)) } pub(crate) fn send_json( mut self, addr: Option, response_decompress: bool, timeout: Option, config: &ClientConfig, value: &T, ) -> SendClientRequest { let body = match serde_json::to_string(value) { Ok(body) => body, Err(err) => return PrepForSendingError::Json(err).into(), }; if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") { return e.into(); } self.send_body( addr, response_decompress, timeout, config, AnyBody::from_message_body(body.into_bytes()), ) } pub(crate) fn send_form( mut self, addr: Option, response_decompress: bool, timeout: Option, config: &ClientConfig, value: &T, ) -> SendClientRequest { let body = match serde_urlencoded::to_string(value) { Ok(body) => body, Err(err) => return PrepForSendingError::Form(err).into(), }; // set content-type if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/x-www-form-urlencoded") { return e.into(); } self.send_body( addr, response_decompress, timeout, config, AnyBody::from_message_body(body.into_bytes()), ) } pub(crate) fn send_stream( self, addr: Option, response_decompress: bool, timeout: Option, config: &ClientConfig, stream: S, ) -> SendClientRequest where S: Stream> + Unpin + 'static, E: Into + 'static, { self.send_body( addr, response_decompress, timeout, config, AnyBody::new_boxed(BodyStream::new(stream)), ) } pub(crate) fn send( self, addr: Option, response_decompress: bool, timeout: Option, config: &ClientConfig, ) -> SendClientRequest { self.send_body(addr, response_decompress, timeout, config, AnyBody::empty()) } fn set_header_if_none(&mut self, key: HeaderName, value: V) -> Result<(), HttpError> where V: TryIntoHeaderValue, { match self { RequestSender::Owned(head) => { if !head.headers.contains_key(&key) { match value.try_into_value() { Ok(value) => { head.headers.insert(key, value); } Err(e) => return Err(e.into()), } } } RequestSender::Rc(head, extra_headers) => { if !head.headers.contains_key(&key) && !extra_headers.iter().any(|h| h.contains_key(&key)) { match value.try_into_value() { Ok(v) => { let h = extra_headers.get_or_insert(HeaderMap::new()); h.insert(key, v) } Err(e) => return Err(e.into()), }; } } } Ok(()) } }