From 45d2fd429928d243d80a6bce246d2542f9e8cde7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 12 Sep 2019 10:40:56 +0600 Subject: [PATCH] export frozen request related types; refactor code layout --- awc/CHANGES.md | 7 + awc/src/frozen.rs | 235 ++++++++++++++++++++ awc/src/lib.rs | 4 + awc/src/request.rs | 535 +++++---------------------------------------- awc/src/sender.rs | 282 ++++++++++++++++++++++++ 5 files changed, 581 insertions(+), 482 deletions(-) create mode 100644 awc/src/frozen.rs create mode 100644 awc/src/sender.rs diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 4a52a9df2..94ad65ffe 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.2.6] - 2019-09-12 + +### Added + +* Export frozen request related types. + + ## [0.2.5] - 2019-09-11 ### Added diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs new file mode 100644 index 000000000..d9f65d431 --- /dev/null +++ b/awc/src/frozen.rs @@ -0,0 +1,235 @@ +use std::net; +use std::rc::Rc; +use std::time::Duration; + +use bytes::Bytes; +use futures::Stream; +use serde::Serialize; + +use actix_http::body::Body; +use actix_http::http::header::IntoHeaderValue; +use actix_http::http::{ + Error as HttpError, HeaderMap, HeaderName, HttpTryFrom, Method, Uri, +}; +use actix_http::{Error, RequestHead}; + +use crate::sender::{RequestSender, SendClientRequest}; +use crate::ClientConfig; + +/// `FrozenClientRequest` struct represents clonable client request. +/// It could be used to send same request multiple times. +#[derive(Clone)] +pub struct FrozenClientRequest { + pub(crate) head: Rc, + pub(crate) addr: Option, + pub(crate) response_decompress: bool, + pub(crate) timeout: Option, + pub(crate) config: Rc, +} + +impl FrozenClientRequest { + /// Get HTTP URI of request + pub fn get_uri(&self) -> &Uri { + &self.head.uri + } + + /// Get HTTP method of this request + pub fn get_method(&self) -> &Method { + &self.head.method + } + + /// Returns request's headers. + pub fn headers(&self) -> &HeaderMap { + &self.head.headers + } + + /// Send a body. + pub fn send_body(&self, body: B) -> SendClientRequest + where + B: Into, + { + RequestSender::Rc(self.head.clone(), None).send_body( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + body, + ) + } + + /// Send a json body. + pub fn send_json(&self, value: &T) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send_json( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + value, + ) + } + + /// Send an urlencoded body. + pub fn send_form(&self, value: &T) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send_form( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + value, + ) + } + + /// Send a streaming body. + pub fn send_stream(&self, stream: S) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + RequestSender::Rc(self.head.clone(), None).send_stream( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + stream, + ) + } + + /// Send an empty body. + pub fn send(&self) -> SendClientRequest { + RequestSender::Rc(self.head.clone(), None).send( + self.addr, + self.response_decompress, + self.timeout, + self.config.as_ref(), + ) + } + + /// Create a `FrozenSendBuilder` with extra headers + pub fn extra_headers(&self, extra_headers: HeaderMap) -> FrozenSendBuilder { + FrozenSendBuilder::new(self.clone(), extra_headers) + } + + /// Create a `FrozenSendBuilder` with an extra header + pub fn extra_header(&self, key: K, value: V) -> FrozenSendBuilder + where + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + self.extra_headers(HeaderMap::new()) + .extra_header(key, value) + } +} + +/// Builder that allows to modify extra headers. +pub struct FrozenSendBuilder { + req: FrozenClientRequest, + extra_headers: HeaderMap, + err: Option, +} + +impl FrozenSendBuilder { + pub(crate) fn new(req: FrozenClientRequest, extra_headers: HeaderMap) -> Self { + Self { + req, + extra_headers, + err: None, + } + } + + /// Insert a header, it overrides existing header in `FrozenClientRequest`. + pub fn extra_header(mut self, key: K, value: V) -> Self + where + HeaderName: HttpTryFrom, + V: IntoHeaderValue, + { + match HeaderName::try_from(key) { + Ok(key) => match value.try_into() { + Ok(value) => self.extra_headers.insert(key, value), + Err(e) => self.err = Some(e.into()), + }, + Err(e) => self.err = Some(e.into()), + } + self + } + + /// Complete request construction and send a body. + pub fn send_body(self, body: B) -> SendClientRequest + where + B: Into, + { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_body( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + body, + ) + } + + /// Complete request construction and send a json body. + pub fn send_json(self, value: &T) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_json( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + value, + ) + } + + /// Complete request construction and send an urlencoded body. + pub fn send_form(self, value: &T) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_form( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + value, + ) + } + + /// Complete request construction and send a streaming body. + pub fn send_stream(self, stream: S) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send_stream( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + stream, + ) + } + + /// Complete request construction and send an empty body. + pub fn send(self) -> SendClientRequest { + if let Some(e) = self.err { + return e.into(); + } + + RequestSender::Rc(self.req.head, Some(self.extra_headers)).send( + self.req.addr, + self.req.response_decompress, + self.req.timeout, + self.req.config.as_ref(), + ) + } +} diff --git a/awc/src/lib.rs b/awc/src/lib.rs index da63bbd93..58c9056b2 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -33,15 +33,19 @@ use actix_http::RequestHead; mod builder; mod connect; pub mod error; +mod frozen; mod request; mod response; +mod sender; pub mod test; pub mod ws; pub use self::builder::ClientBuilder; pub use self::connect::BoxedSocket; +pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; pub use self::response::{ClientResponse, JsonBody, MessageBody}; +pub use self::sender::SendClientRequest; use self::connect::{Connect, ConnectorWrapper}; diff --git a/awc/src/request.rs b/awc/src/request.rs index 4dd07c5d8..d597a1638 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -1,29 +1,26 @@ use std::fmt::Write as FmtWrite; use std::io::Write; use std::rc::Rc; -use std::time::{Duration, Instant}; +use std::time::Duration; use std::{fmt, net}; use bytes::{BufMut, Bytes, BytesMut}; -use futures::{Async, Future, Poll, Stream, try_ready}; +use futures::Stream; use percent_encoding::percent_encode; use serde::Serialize; -use serde_json; -use tokio_timer::Delay; -use derive_more::From; -use actix_http::body::{Body, BodyStream}; +use actix_http::body::Body; use actix_http::cookie::{Cookie, CookieJar, USERINFO}; -use actix_http::encoding::Decoder; -use actix_http::http::header::{self, ContentEncoding, Header, IntoHeaderValue}; +use actix_http::http::header::{self, Header, IntoHeaderValue}; use actix_http::http::{ uri, ConnectionType, Error as HttpError, HeaderMap, HeaderName, HeaderValue, HttpTryFrom, Method, Uri, Version, }; -use actix_http::{Error, Payload, PayloadStream, RequestHead}; +use actix_http::{Error, RequestHead}; -use crate::error::{InvalidUrl, SendRequestError, FreezeRequestError}; -use crate::response::ClientResponse; +use crate::error::{FreezeRequestError, InvalidUrl}; +use crate::frozen::FrozenClientRequest; +use crate::sender::{PrepForSendingError, RequestSender, SendClientRequest}; use crate::ClientConfig; #[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))] @@ -375,6 +372,8 @@ impl ClientRequest { } } + /// Freeze request builder and construct `FrozenClientRequest`, + /// which could be used for sending same request multiple times. pub fn freeze(self) -> Result { let slf = match self.prep_for_sending() { Ok(slf) => slf, @@ -393,10 +392,7 @@ impl ClientRequest { } /// Complete request construction and send body. - pub fn send_body( - self, - body: B, - ) -> SendBody + pub fn send_body(self, body: B) -> SendClientRequest where B: Into, { @@ -405,47 +401,51 @@ impl ClientRequest { Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_body(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), body) + RequestSender::Owned(slf.head).send_body( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + body, + ) } /// Set a JSON body and generate `ClientRequest` - pub fn send_json( - self, - value: &T, - ) -> SendBody - { + pub fn send_json(self, value: &T) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_json(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value) + RequestSender::Owned(slf.head).send_json( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + value, + ) } /// Set a urlencoded body and generate `ClientRequest` /// /// `ClientRequestBuilder` can not be used after this call. - pub fn send_form( - self, - value: &T, - ) -> SendBody - { + pub fn send_form(self, value: &T) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_form(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), value) + RequestSender::Owned(slf.head).send_form( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + value, + ) } /// Set an streaming body and generate `ClientRequest`. - pub fn send_stream( - self, - stream: S, - ) -> SendBody + pub fn send_stream(self, stream: S) -> SendClientRequest where S: Stream + 'static, E: Into + 'static, @@ -455,22 +455,28 @@ impl ClientRequest { Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send_stream(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref(), stream) + RequestSender::Owned(slf.head).send_stream( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + stream, + ) } /// Set an empty body and generate `ClientRequest`. - pub fn send( - self, - ) -> SendBody - { + pub fn send(self) -> SendClientRequest { let slf = match self.prep_for_sending() { Ok(slf) => slf, Err(e) => return e.into(), }; - RequestSender::Owned(slf.head) - .send(slf.addr, slf.response_decompress, slf.timeout, slf.config.as_ref()) + RequestSender::Owned(slf.head).send( + slf.addr, + slf.response_decompress, + slf.timeout, + slf.config.as_ref(), + ) } fn prep_for_sending(mut self) -> Result { @@ -528,10 +534,10 @@ impl ClientRequest { slf = slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING) } else { #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] - { - slf = slf - .set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") - } + { + slf = slf + .set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate") + } }; } } @@ -555,441 +561,6 @@ impl fmt::Debug for ClientRequest { } } -#[derive(Clone)] -pub struct FrozenClientRequest { - pub(crate) head: Rc, - pub(crate) addr: Option, - pub(crate) response_decompress: bool, - pub(crate) timeout: Option, - pub(crate) config: Rc, -} - -impl FrozenClientRequest { - /// Get HTTP URI of request - pub fn get_uri(&self) -> &Uri { - &self.head.uri - } - - /// Get HTTP method of this request - pub fn get_method(&self) -> &Method { - &self.head.method - } - - /// Returns request's headers. - pub fn headers(&self) -> &HeaderMap { - &self.head.headers - } - - /// Send a body. - pub fn send_body( - &self, - body: B, - ) -> SendBody - where - B: Into, - { - RequestSender::Rc(self.head.clone(), None) - .send_body(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), body) - } - - /// Send a json body. - pub fn send_json( - &self, - value: &T, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send_json(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) - } - - /// Send an urlencoded body. - pub fn send_form( - &self, - value: &T, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send_form(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), value) - } - - /// Send a streaming body. - pub fn send_stream( - &self, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - RequestSender::Rc(self.head.clone(), None) - .send_stream(self.addr, self.response_decompress, self.timeout, self.config.as_ref(), stream) - } - - /// Send an empty body. - pub fn send( - &self, - ) -> SendBody - { - RequestSender::Rc(self.head.clone(), None) - .send(self.addr, self.response_decompress, self.timeout, self.config.as_ref()) - } - - /// Create a `FrozenSendBuilder` with extra headers - pub fn extra_headers(&self, extra_headers: HeaderMap) -> FrozenSendBuilder { - FrozenSendBuilder::new(self.clone(), extra_headers) - } - - /// Create a `FrozenSendBuilder` with an extra header - pub fn extra_header(&self, key: K, value: V) -> FrozenSendBuilder - where - HeaderName: HttpTryFrom, - V: IntoHeaderValue, - { - self.extra_headers(HeaderMap::new()).extra_header(key, value) - } -} - -pub struct FrozenSendBuilder { - req: FrozenClientRequest, - extra_headers: HeaderMap, - err: Option, -} - -impl FrozenSendBuilder { - pub(crate) fn new(req: FrozenClientRequest, extra_headers: HeaderMap) -> Self { - Self { - req, - extra_headers, - err: None, - } - } - - /// Insert a header, it overrides existing header in `FrozenClientRequest`. - pub fn extra_header(mut self, key: K, value: V) -> Self - where - HeaderName: HttpTryFrom, - V: IntoHeaderValue, - { - match HeaderName::try_from(key) { - Ok(key) => match value.try_into() { - Ok(value) => self.extra_headers.insert(key, value), - Err(e) => self.err = Some(e.into()), - }, - Err(e) => self.err = Some(e.into()), - } - self - } - - /// Complete request construction and send a body. - pub fn send_body( - self, - body: B, - ) -> SendBody - where - B: Into, - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_body(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), body) - } - - /// Complete request construction and send a json body. - pub fn send_json( - self, - value: &T, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_json(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), value) - } - - /// Complete request construction and send an urlencoded body. - pub fn send_form( - self, - value: &T, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_form(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), value) - } - - /// Complete request construction and send a streaming body. - pub fn send_stream( - self, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send_stream(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref(), stream) - } - - /// Complete request construction and send an empty body. - pub fn send( - self, - ) -> SendBody - { - if let Some(e) = self.err { - return e.into() - } - - RequestSender::Rc(self.req.head, Some(self.extra_headers)) - .send(self.req.addr, self.req.response_decompress, self.req.timeout, self.req.config.as_ref()) - } -} - -#[derive(Debug, From)] -enum PrepForSendingError { - Url(InvalidUrl), - Http(HttpError), -} - -impl Into for PrepForSendingError { - fn into(self) -> FreezeRequestError { - match self { - PrepForSendingError::Url(e) => FreezeRequestError::Url(e), - PrepForSendingError::Http(e) => FreezeRequestError::Http(e), - } - } -} - -impl Into for PrepForSendingError { - fn into(self) -> SendRequestError { - match self { - PrepForSendingError::Url(e) => SendRequestError::Url(e), - PrepForSendingError::Http(e) => SendRequestError::Http(e), - } - } -} - -pub enum SendBody -{ - Fut(Box>, Option, bool), - Err(Option), -} - -impl SendBody -{ - pub fn new( - send: Box>, - response_decompress: bool, - timeout: Option, - ) -> SendBody - { - let delay = timeout.map(|t| Delay::new(Instant::now() + t)); - SendBody::Fut(send, delay, response_decompress) - } -} - -impl Future for SendBody -{ - type Item = ClientResponse>>; - type Error = SendRequestError; - - fn poll(&mut self) -> Poll { - match self { - SendBody::Fut(send, delay, response_decompress) => { - if delay.is_some() { - match delay.poll() { - Ok(Async::NotReady) => (), - _ => return Err(SendRequestError::Timeout), - } - } - - let res = try_ready!(send.poll()) - .map_body(|head, payload| { - if *response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) - } else { - Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) - } - }); - - Ok(Async::Ready(res)) - }, - SendBody::Err(ref mut e) => { - match e.take() { - Some(e) => Err(e.into()), - None => panic!("Attempting to call completed future"), - } - } - } - } -} - - -impl From for SendBody -{ - fn from(e: SendRequestError) -> Self { - SendBody::Err(Some(e)) - } -} - -impl From for SendBody -{ - fn from(e: Error) -> Self { - SendBody::Err(Some(e.into())) - } -} - -impl From for SendBody -{ - fn from(e: HttpError) -> Self { - SendBody::Err(Some(e.into())) - } -} - -impl From for SendBody -{ - fn from(e: PrepForSendingError) -> Self { - SendBody::Err(Some(e.into())) - } -} - -#[derive(Debug)] -enum RequestSender { - Owned(RequestHead), - Rc(Rc, Option), -} - -impl RequestSender { - pub fn send_body( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - body: B, - ) -> SendBody - where - B: Into, - { - let mut connector = config.connector.borrow_mut(); - - let fut = match self { - RequestSender::Owned(head) => connector.send_request(head, body.into(), addr), - RequestSender::Rc(head, extra_headers) => connector.send_request_extra(head, extra_headers, body.into(), addr), - }; - - SendBody::new(fut, response_decompress, timeout.or_else(|| config.timeout.clone())) - } - - pub fn send_json( - mut self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - value: &T, - ) -> SendBody - { - let body = match serde_json::to_string(value) { - Ok(body) => body, - Err(e) => return Error::from(e).into(), - }; - - if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") { - return e.into(); - } - - self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))) - } - - pub fn send_form( - mut self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - value: &T, - ) -> SendBody - { - let body = match serde_urlencoded::to_string(value) { - Ok(body) => body, - Err(e) => return Error::from(e).into(), - }; - - // set content-type - if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/x-www-form-urlencoded") { - return e.into(); - } - - self.send_body(addr, response_decompress, timeout, config, Body::Bytes(Bytes::from(body))) - } - - pub fn send_stream( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - stream: S, - ) -> SendBody - where - S: Stream + 'static, - E: Into + 'static, - { - self.send_body(addr, response_decompress, timeout, config, Body::from_message(BodyStream::new(stream))) - } - - pub fn send( - self, - addr: Option, - response_decompress: bool, - timeout: Option, - config: &ClientConfig, - ) -> SendBody - { - self.send_body(addr, response_decompress, timeout, config, Body::Empty) - } - - fn set_header_if_none(&mut self, key: HeaderName, value: V) -> Result<(), HttpError> - where - V: IntoHeaderValue, - { - match self { - RequestSender::Owned(head) => { - if !head.headers.contains_key(&key) { - match value.try_into() { - Ok(value) => head.headers.insert(key, value), - Err(e) => return Err(e.into()), - } - } - }, - RequestSender::Rc(head, extra_headers) => { - if !head.headers.contains_key(&key) && !extra_headers.iter().any(|h| h.contains_key(&key)) { - match value.try_into(){ - Ok(v) => { - let h = extra_headers.get_or_insert(HeaderMap::new()); - h.insert(key, v) - }, - Err(e) => return Err(e.into()), - }; - } - } - } - - Ok(()) - } -} - #[cfg(test)] mod tests { use std::time::SystemTime; diff --git a/awc/src/sender.rs b/awc/src/sender.rs new file mode 100644 index 000000000..c8e169cb1 --- /dev/null +++ b/awc/src/sender.rs @@ -0,0 +1,282 @@ +use std::net; +use std::rc::Rc; +use std::time::{Duration, Instant}; + +use bytes::Bytes; +use derive_more::From; +use futures::{try_ready, Async, Future, Poll, Stream}; +use serde::Serialize; +use serde_json; +use tokio_timer::Delay; + +use actix_http::body::{Body, BodyStream}; +use actix_http::encoding::Decoder; +use actix_http::http::header::{self, ContentEncoding, IntoHeaderValue}; +use actix_http::http::{Error as HttpError, HeaderMap, HeaderName}; +use actix_http::{Error, Payload, PayloadStream, RequestHead}; + +use crate::error::{FreezeRequestError, InvalidUrl, SendRequestError}; +use crate::response::ClientResponse; +use crate::ClientConfig; + +#[derive(Debug, From)] +pub(crate) enum PrepForSendingError { + Url(InvalidUrl), + Http(HttpError), +} + +impl Into for PrepForSendingError { + fn into(self) -> FreezeRequestError { + match self { + PrepForSendingError::Url(e) => FreezeRequestError::Url(e), + PrepForSendingError::Http(e) => FreezeRequestError::Http(e), + } + } +} + +impl Into for PrepForSendingError { + fn into(self) -> SendRequestError { + match self { + PrepForSendingError::Url(e) => SendRequestError::Url(e), + PrepForSendingError::Http(e) => SendRequestError::Http(e), + } + } +} + +/// Future that sends request's payload and resolves to a server response. +#[must_use = "futures do nothing unless polled"] +pub enum SendClientRequest { + Fut( + Box>, + Option, + bool, + ), + Err(Option), +} + +impl SendClientRequest { + pub(crate) fn new( + send: Box>, + response_decompress: bool, + timeout: Option, + ) -> SendClientRequest { + let delay = timeout.map(|t| Delay::new(Instant::now() + t)); + SendClientRequest::Fut(send, delay, response_decompress) + } +} + +impl Future for SendClientRequest { + type Item = ClientResponse>>; + type Error = SendRequestError; + + fn poll(&mut self) -> Poll { + match self { + SendClientRequest::Fut(send, delay, response_decompress) => { + if delay.is_some() { + match delay.poll() { + Ok(Async::NotReady) => (), + _ => return Err(SendRequestError::Timeout), + } + } + + let res = try_ready!(send.poll()).map_body(|head, payload| { + if *response_decompress { + Payload::Stream(Decoder::from_headers(payload, &head.headers)) + } else { + Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) + } + }); + + Ok(Async::Ready(res)) + } + SendClientRequest::Err(ref mut e) => match e.take() { + Some(e) => Err(e.into()), + None => panic!("Attempting to call completed future"), + }, + } + } +} + +impl From for SendClientRequest { + fn from(e: SendRequestError) -> Self { + SendClientRequest::Err(Some(e)) + } +} + +impl From for SendClientRequest { + fn from(e: Error) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +impl From for SendClientRequest { + fn from(e: HttpError) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +impl From for SendClientRequest { + fn from(e: PrepForSendingError) -> Self { + SendClientRequest::Err(Some(e.into())) + } +} + +#[derive(Debug)] +pub(crate) enum RequestSender { + Owned(RequestHead), + Rc(Rc, Option), +} + +impl RequestSender { + pub(crate) fn send_body( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + body: B, + ) -> SendClientRequest + where + B: Into, + { + let mut connector = config.connector.borrow_mut(); + + let fut = match self { + RequestSender::Owned(head) => { + connector.send_request(head, body.into(), addr) + } + RequestSender::Rc(head, extra_headers) => { + connector.send_request_extra(head, extra_headers, body.into(), addr) + } + }; + + SendClientRequest::new( + fut, + response_decompress, + timeout.or_else(|| config.timeout.clone()), + ) + } + + pub(crate) fn send_json( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> SendClientRequest { + let body = match serde_json::to_string(value) { + Ok(body) => body, + Err(e) => return Error::from(e).into(), + }; + + if let Err(e) = self.set_header_if_none(header::CONTENT_TYPE, "application/json") + { + return e.into(); + } + + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::Bytes(Bytes::from(body)), + ) + } + + pub(crate) fn send_form( + mut self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + value: &T, + ) -> SendClientRequest { + let body = match serde_urlencoded::to_string(value) { + Ok(body) => body, + Err(e) => return Error::from(e).into(), + }; + + // set content-type + if let Err(e) = self.set_header_if_none( + header::CONTENT_TYPE, + "application/x-www-form-urlencoded", + ) { + return e.into(); + } + + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::Bytes(Bytes::from(body)), + ) + } + + pub(crate) fn send_stream( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + stream: S, + ) -> SendClientRequest + where + S: Stream + 'static, + E: Into + 'static, + { + self.send_body( + addr, + response_decompress, + timeout, + config, + Body::from_message(BodyStream::new(stream)), + ) + } + + pub(crate) fn send( + self, + addr: Option, + response_decompress: bool, + timeout: Option, + config: &ClientConfig, + ) -> SendClientRequest { + self.send_body(addr, response_decompress, timeout, config, Body::Empty) + } + + fn set_header_if_none( + &mut self, + key: HeaderName, + value: V, + ) -> Result<(), HttpError> + where + V: IntoHeaderValue, + { + match self { + RequestSender::Owned(head) => { + if !head.headers.contains_key(&key) { + match value.try_into() { + Ok(value) => head.headers.insert(key, value), + Err(e) => return Err(e.into()), + } + } + } + RequestSender::Rc(head, extra_headers) => { + if !head.headers.contains_key(&key) + && !extra_headers.iter().any(|h| h.contains_key(&key)) + { + match value.try_into() { + Ok(v) => { + let h = extra_headers.get_or_insert(HeaderMap::new()); + h.insert(key, v) + } + Err(e) => return Err(e.into()), + }; + } + } + } + + Ok(()) + } +}