mirror of
https://github.com/fafhrd91/actix-web
synced 2025-06-25 06:39:22 +02:00
add client decompression support
This commit is contained in:
@ -2,6 +2,7 @@ use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
pub use actix_http::client::{ConnectError, InvalidUrl, SendRequestError};
|
||||
pub use actix_http::error::PayloadError;
|
||||
pub use actix_http::http;
|
||||
|
||||
use actix_http::client::Connector;
|
||||
|
@ -13,15 +13,24 @@ use serde_json;
|
||||
|
||||
use actix_http::body::{Body, BodyStream};
|
||||
use actix_http::client::{InvalidUrl, SendRequestError};
|
||||
use actix_http::http::header::{self, Header, IntoHeaderValue};
|
||||
use actix_http::encoding::Decoder;
|
||||
use actix_http::http::header::{self, ContentEncoding, Header, IntoHeaderValue};
|
||||
use actix_http::http::{
|
||||
uri, ConnectionType, Error as HttpError, HeaderName, HeaderValue, HttpTryFrom,
|
||||
Method, Uri, Version,
|
||||
};
|
||||
use actix_http::{Error, Head, RequestHead};
|
||||
use actix_http::{Error, Head, Payload, RequestHead};
|
||||
|
||||
use crate::response::ClientResponse;
|
||||
use crate::Connect;
|
||||
use crate::{Connect, PayloadError};
|
||||
|
||||
#[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))]
|
||||
const HTTPS_ENCODING: &str = "br, gzip, deflate";
|
||||
#[cfg(all(
|
||||
any(feature = "flate2-zlib", feature = "flate2-rust"),
|
||||
not(feature = "brotli")
|
||||
))]
|
||||
const HTTPS_ENCODING: &str = "gzip, deflate";
|
||||
|
||||
/// An HTTP Client request builder
|
||||
///
|
||||
@ -52,6 +61,7 @@ pub struct ClientRequest {
|
||||
#[cfg(feature = "cookies")]
|
||||
cookies: Option<CookieJar>,
|
||||
default_headers: bool,
|
||||
response_decompress: bool,
|
||||
connector: Rc<RefCell<dyn Connect>>,
|
||||
}
|
||||
|
||||
@ -81,6 +91,7 @@ impl ClientRequest {
|
||||
#[cfg(feature = "cookies")]
|
||||
cookies: None,
|
||||
default_headers: true,
|
||||
response_decompress: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -275,6 +286,12 @@ impl ClientRequest {
|
||||
self
|
||||
}
|
||||
|
||||
/// Disable automatic decompress of response's body
|
||||
pub fn no_decompress(mut self) -> Self {
|
||||
self.response_decompress = false;
|
||||
self
|
||||
}
|
||||
|
||||
/// This method calls provided closure with builder reference if
|
||||
/// value is `true`.
|
||||
pub fn if_true<F>(mut self, value: bool, f: F) -> Self
|
||||
@ -303,7 +320,10 @@ impl ClientRequest {
|
||||
pub fn send_body<B>(
|
||||
mut self,
|
||||
body: B,
|
||||
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
|
||||
) -> impl Future<
|
||||
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
|
||||
Error = SendRequestError,
|
||||
>
|
||||
where
|
||||
B: Into<Body>,
|
||||
{
|
||||
@ -311,42 +331,44 @@ impl ClientRequest {
|
||||
return Either::A(err(e.into()));
|
||||
}
|
||||
|
||||
let mut slf = if self.default_headers {
|
||||
// enable br only for https
|
||||
let https = self
|
||||
.head
|
||||
.uri
|
||||
.scheme_part()
|
||||
.map(|s| s == &uri::Scheme::HTTPS)
|
||||
.unwrap_or(true);
|
||||
|
||||
let mut slf = if https {
|
||||
self.set_header_if_none(header::ACCEPT_ENCODING, "br, gzip, deflate")
|
||||
} else {
|
||||
self.set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate")
|
||||
};
|
||||
// validate uri
|
||||
let uri = &self.head.uri;
|
||||
if uri.host().is_none() {
|
||||
return Either::A(err(InvalidUrl::MissingHost.into()));
|
||||
} else if uri.scheme_part().is_none() {
|
||||
return Either::A(err(InvalidUrl::MissingScheme.into()));
|
||||
} else if let Some(scheme) = uri.scheme_part() {
|
||||
match scheme.as_str() {
|
||||
"http" | "ws" | "https" | "wss" => (),
|
||||
_ => return Either::A(err(InvalidUrl::UnknownScheme.into())),
|
||||
}
|
||||
} else {
|
||||
return Either::A(err(InvalidUrl::UnknownScheme.into()));
|
||||
}
|
||||
|
||||
// set default headers
|
||||
let slf = if self.default_headers {
|
||||
// set request host header
|
||||
if let Some(host) = slf.head.uri.host() {
|
||||
if !slf.head.headers.contains_key(header::HOST) {
|
||||
if let Some(host) = self.head.uri.host() {
|
||||
if !self.head.headers.contains_key(header::HOST) {
|
||||
let mut wrt = BytesMut::with_capacity(host.len() + 5).writer();
|
||||
|
||||
let _ = match slf.head.uri.port_u16() {
|
||||
let _ = match self.head.uri.port_u16() {
|
||||
None | Some(80) | Some(443) => write!(wrt, "{}", host),
|
||||
Some(port) => write!(wrt, "{}:{}", host, port),
|
||||
};
|
||||
|
||||
match wrt.get_mut().take().freeze().try_into() {
|
||||
Ok(value) => {
|
||||
slf.head.headers.insert(header::HOST, value);
|
||||
self.head.headers.insert(header::HOST, value);
|
||||
}
|
||||
Err(e) => slf.err = Some(e.into()),
|
||||
Err(e) => return Either::A(err(HttpError::from(e).into())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// user agent
|
||||
slf.set_header_if_none(
|
||||
self.set_header_if_none(
|
||||
header::USER_AGENT,
|
||||
concat!("actix-http/", env!("CARGO_PKG_VERSION")),
|
||||
)
|
||||
@ -354,6 +376,32 @@ impl ClientRequest {
|
||||
self
|
||||
};
|
||||
|
||||
// enable br only for https
|
||||
let https = slf
|
||||
.head
|
||||
.uri
|
||||
.scheme_part()
|
||||
.map(|s| s == &uri::Scheme::HTTPS)
|
||||
.unwrap_or(true);
|
||||
|
||||
#[cfg(any(
|
||||
feature = "brotli",
|
||||
feature = "flate2-zlib",
|
||||
feature = "flate2-rust"
|
||||
))]
|
||||
let mut slf = {
|
||||
if https {
|
||||
slf.set_header_if_none(header::ACCEPT_ENCODING, HTTPS_ENCODING)
|
||||
} else {
|
||||
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
|
||||
{
|
||||
slf.set_header_if_none(header::ACCEPT_ENCODING, "gzip, deflate")
|
||||
}
|
||||
#[cfg(not(any(feature = "flate2-zlib", feature = "flate2-rust")))]
|
||||
slf
|
||||
}
|
||||
};
|
||||
|
||||
#[allow(unused_mut)]
|
||||
let mut head = slf.head;
|
||||
|
||||
@ -378,30 +426,32 @@ impl ClientRequest {
|
||||
}
|
||||
}
|
||||
|
||||
let uri = head.uri.clone();
|
||||
let response_decompress = slf.response_decompress;
|
||||
|
||||
// validate uri
|
||||
if uri.host().is_none() {
|
||||
Either::A(err(InvalidUrl::MissingHost.into()))
|
||||
} else if uri.scheme_part().is_none() {
|
||||
Either::A(err(InvalidUrl::MissingScheme.into()))
|
||||
} else if let Some(scheme) = uri.scheme_part() {
|
||||
match scheme.as_str() {
|
||||
"http" | "ws" | "https" | "wss" => {
|
||||
Either::B(slf.connector.borrow_mut().send_request(head, body.into()))
|
||||
}
|
||||
_ => Either::A(err(InvalidUrl::UnknownScheme.into())),
|
||||
}
|
||||
} else {
|
||||
Either::A(err(InvalidUrl::UnknownScheme.into()))
|
||||
}
|
||||
let fut = slf
|
||||
.connector
|
||||
.borrow_mut()
|
||||
.send_request(head, body.into())
|
||||
.map(move |res| {
|
||||
res.map_body(|head, payload| {
|
||||
if response_decompress {
|
||||
Payload::Stream(Decoder::from_headers(&head.headers, payload))
|
||||
} else {
|
||||
Payload::Stream(Decoder::new(payload, ContentEncoding::Identity))
|
||||
}
|
||||
})
|
||||
});
|
||||
Either::B(fut)
|
||||
}
|
||||
|
||||
/// Set a JSON body and generate `ClientRequest`
|
||||
pub fn send_json<T: Serialize>(
|
||||
self,
|
||||
value: T,
|
||||
) -> impl Future<Item = ClientResponse, Error = SendRequestError> {
|
||||
) -> impl Future<
|
||||
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
|
||||
Error = SendRequestError,
|
||||
> {
|
||||
let body = match serde_json::to_string(&value) {
|
||||
Ok(body) => body,
|
||||
Err(e) => return Either::A(err(Error::from(e).into())),
|
||||
@ -422,7 +472,10 @@ impl ClientRequest {
|
||||
pub fn send_form<T: Serialize>(
|
||||
self,
|
||||
value: T,
|
||||
) -> impl Future<Item = ClientResponse, Error = SendRequestError> {
|
||||
) -> impl Future<
|
||||
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
|
||||
Error = SendRequestError,
|
||||
> {
|
||||
let body = match serde_urlencoded::to_string(&value) {
|
||||
Ok(body) => body,
|
||||
Err(e) => return Either::A(err(Error::from(e).into())),
|
||||
@ -441,7 +494,10 @@ impl ClientRequest {
|
||||
pub fn send_stream<S, E>(
|
||||
self,
|
||||
stream: S,
|
||||
) -> impl Future<Item = ClientResponse, Error = SendRequestError>
|
||||
) -> impl Future<
|
||||
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
|
||||
Error = SendRequestError,
|
||||
>
|
||||
where
|
||||
S: Stream<Item = Bytes, Error = E> + 'static,
|
||||
E: Into<Error> + 'static,
|
||||
@ -450,7 +506,12 @@ impl ClientRequest {
|
||||
}
|
||||
|
||||
/// Set an empty body and generate `ClientRequest`.
|
||||
pub fn send(self) -> impl Future<Item = ClientResponse, Error = SendRequestError> {
|
||||
pub fn send(
|
||||
self,
|
||||
) -> impl Future<
|
||||
Item = ClientResponse<impl Stream<Item = Bytes, Error = PayloadError>>,
|
||||
Error = SendRequestError,
|
||||
> {
|
||||
self.send_body(Body::Empty)
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +1,22 @@
|
||||
use std::cell::{Ref, RefMut};
|
||||
use std::fmt;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Poll, Stream};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{Future, Poll, Stream};
|
||||
|
||||
use actix_http::error::PayloadError;
|
||||
use actix_http::http::header::CONTENT_LENGTH;
|
||||
use actix_http::http::{HeaderMap, StatusCode, Version};
|
||||
use actix_http::{Extensions, Head, HttpMessage, Payload, PayloadStream, ResponseHead};
|
||||
|
||||
/// Client Response
|
||||
pub struct ClientResponse {
|
||||
pub struct ClientResponse<S = PayloadStream> {
|
||||
pub(crate) head: ResponseHead,
|
||||
pub(crate) payload: Payload,
|
||||
pub(crate) payload: Payload<S>,
|
||||
}
|
||||
|
||||
impl HttpMessage for ClientResponse {
|
||||
type Stream = PayloadStream;
|
||||
impl<S> HttpMessage for ClientResponse<S> {
|
||||
type Stream = S;
|
||||
|
||||
fn headers(&self) -> &HeaderMap {
|
||||
&self.head.headers
|
||||
@ -29,14 +30,14 @@ impl HttpMessage for ClientResponse {
|
||||
self.head.extensions_mut()
|
||||
}
|
||||
|
||||
fn take_payload(&mut self) -> Payload {
|
||||
fn take_payload(&mut self) -> Payload<S> {
|
||||
std::mem::replace(&mut self.payload, Payload::None)
|
||||
}
|
||||
}
|
||||
|
||||
impl ClientResponse {
|
||||
impl<S> ClientResponse<S> {
|
||||
/// Create new Request instance
|
||||
pub(crate) fn new(head: ResponseHead, payload: Payload) -> ClientResponse {
|
||||
pub(crate) fn new(head: ResponseHead, payload: Payload<S>) -> Self {
|
||||
ClientResponse { head, payload }
|
||||
}
|
||||
|
||||
@ -79,9 +80,35 @@ impl ClientResponse {
|
||||
pub fn keep_alive(&self) -> bool {
|
||||
self.head().keep_alive()
|
||||
}
|
||||
|
||||
/// Set a body and return previous body value
|
||||
pub fn map_body<F, U>(mut self, f: F) -> ClientResponse<U>
|
||||
where
|
||||
F: FnOnce(&mut ResponseHead, Payload<S>) -> Payload<U>,
|
||||
{
|
||||
let payload = f(&mut self.head, self.payload);
|
||||
|
||||
ClientResponse {
|
||||
payload,
|
||||
head: self.head,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for ClientResponse {
|
||||
impl<S> ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
||||
{
|
||||
/// Load http response's body.
|
||||
pub fn body(self) -> MessageBody<S> {
|
||||
MessageBody::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Stream for ClientResponse<S>
|
||||
where
|
||||
S: Stream<Item = Bytes, Error = PayloadError>,
|
||||
{
|
||||
type Item = Bytes;
|
||||
type Error = PayloadError;
|
||||
|
||||
@ -90,7 +117,7 @@ impl Stream for ClientResponse {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for ClientResponse {
|
||||
impl<S> fmt::Debug for ClientResponse<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
writeln!(f, "\nClientResponse {:?} {}", self.version(), self.status(),)?;
|
||||
writeln!(f, " headers:")?;
|
||||
@ -100,3 +127,100 @@ impl fmt::Debug for ClientResponse {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves to a complete http message body.
|
||||
pub struct MessageBody<S> {
|
||||
limit: usize,
|
||||
length: Option<usize>,
|
||||
stream: Option<ClientResponse<S>>,
|
||||
err: Option<PayloadError>,
|
||||
fut: Option<Box<Future<Item = Bytes, Error = PayloadError>>>,
|
||||
}
|
||||
|
||||
impl<S> MessageBody<S>
|
||||
where
|
||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
||||
{
|
||||
/// Create `MessageBody` for request.
|
||||
pub fn new(res: ClientResponse<S>) -> MessageBody<S> {
|
||||
let mut len = None;
|
||||
if let Some(l) = res.headers().get(CONTENT_LENGTH) {
|
||||
if let Ok(s) = l.to_str() {
|
||||
if let Ok(l) = s.parse::<usize>() {
|
||||
len = Some(l)
|
||||
} else {
|
||||
return Self::err(PayloadError::UnknownLength);
|
||||
}
|
||||
} else {
|
||||
return Self::err(PayloadError::UnknownLength);
|
||||
}
|
||||
}
|
||||
|
||||
MessageBody {
|
||||
limit: 262_144,
|
||||
length: len,
|
||||
stream: Some(res),
|
||||
fut: None,
|
||||
err: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size of payload. By default max size is 256Kb
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
self.limit = limit;
|
||||
self
|
||||
}
|
||||
|
||||
fn err(e: PayloadError) -> Self {
|
||||
MessageBody {
|
||||
stream: None,
|
||||
limit: 262_144,
|
||||
fut: None,
|
||||
err: Some(e),
|
||||
length: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Future for MessageBody<S>
|
||||
where
|
||||
S: Stream<Item = Bytes, Error = PayloadError> + 'static,
|
||||
{
|
||||
type Item = Bytes;
|
||||
type Error = PayloadError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
return fut.poll();
|
||||
}
|
||||
|
||||
if let Some(err) = self.err.take() {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
if let Some(len) = self.length.take() {
|
||||
if len > self.limit {
|
||||
return Err(PayloadError::Overflow);
|
||||
}
|
||||
}
|
||||
|
||||
// future
|
||||
let limit = self.limit;
|
||||
self.fut = Some(Box::new(
|
||||
self.stream
|
||||
.take()
|
||||
.expect("Can not be used second time")
|
||||
.from_err()
|
||||
.fold(BytesMut::with_capacity(8192), move |mut body, chunk| {
|
||||
if (body.len() + chunk.len()) > limit {
|
||||
Err(PayloadError::Overflow)
|
||||
} else {
|
||||
body.extend_from_slice(&chunk);
|
||||
Ok(body)
|
||||
}
|
||||
})
|
||||
.map(|body| body.freeze()),
|
||||
));
|
||||
self.poll()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user