mirror of
https://github.com/actix/actix-extras.git
synced 2025-06-26 10:27:42 +02:00
replace reqwest with actix::client
This commit is contained in:
@ -1,3 +1,4 @@
|
||||
//! Http client
|
||||
mod connector;
|
||||
mod parser;
|
||||
mod request;
|
||||
@ -7,7 +8,7 @@ mod writer;
|
||||
|
||||
pub use self::pipeline::{SendRequest, SendRequestError};
|
||||
pub use self::request::{ClientRequest, ClientRequestBuilder};
|
||||
pub use self::response::{ClientResponse, JsonResponse, UrlEncoded};
|
||||
pub use self::response::{ClientResponse, ResponseBody, JsonResponse, UrlEncoded};
|
||||
pub use self::connector::{Connect, Connection, ClientConnector, ClientConnectorError};
|
||||
pub(crate) use self::writer::HttpClientWriter;
|
||||
pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};
|
||||
|
@ -21,11 +21,13 @@ pub struct HttpResponseParser {
|
||||
decoder: Option<Decoder>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum HttpResponseParserError {
|
||||
/// Server disconnected
|
||||
#[fail(display="Server disconnected")]
|
||||
Disconnect,
|
||||
Payload,
|
||||
Error(ParseError),
|
||||
#[fail(display="{}", _0)]
|
||||
Error(#[cause] ParseError),
|
||||
}
|
||||
|
||||
impl HttpResponseParser {
|
||||
|
@ -11,10 +11,18 @@ use super::{Connect, Connection, ClientConnector, ClientConnectorError};
|
||||
use super::HttpClientWriter;
|
||||
use super::{HttpResponseParser, HttpResponseParserError};
|
||||
|
||||
/// A set of errors that can occur during sending request and reading response
|
||||
#[derive(Fail, Debug)]
|
||||
pub enum SendRequestError {
|
||||
Connector(ClientConnectorError),
|
||||
ParseError(HttpResponseParserError),
|
||||
Io(io::Error),
|
||||
/// Failed to connect to host
|
||||
#[fail(display="Failed to connect to host: {}", _0)]
|
||||
Connector(#[cause] ClientConnectorError),
|
||||
/// Error parsing response
|
||||
#[fail(display="{}", _0)]
|
||||
ParseError(#[cause] HttpResponseParserError),
|
||||
/// Error reading response payload
|
||||
#[fail(display="Error reading response payload: {}", _0)]
|
||||
Io(#[cause] io::Error),
|
||||
}
|
||||
|
||||
impl From<io::Error> for SendRequestError {
|
||||
@ -116,6 +124,7 @@ impl Pipeline {
|
||||
|
||||
#[inline]
|
||||
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
|
||||
self.poll_write()?;
|
||||
self.parser.parse_payload(&mut self.conn, &mut self.parser_buf)
|
||||
}
|
||||
|
||||
|
@ -463,6 +463,15 @@ impl ClientRequestBuilder {
|
||||
pub fn finish(&mut self) -> Result<ClientRequest, HttpError> {
|
||||
self.body(Body::Empty)
|
||||
}
|
||||
|
||||
/// This method construct new `ClientRequestBuilder`
|
||||
pub fn take(&mut self) -> ClientRequestBuilder {
|
||||
ClientRequestBuilder {
|
||||
request: self.request.take(),
|
||||
err: self.err.take(),
|
||||
cookies: self.cookies.take(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -158,6 +158,15 @@ impl ClientResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Load request body.
|
||||
///
|
||||
/// By default only 256Kb payload reads to a memory, then connection get dropped
|
||||
/// and `PayloadError` get returned. Use `ResponseBody::limit()`
|
||||
/// method to change upper limit.
|
||||
pub fn body(self) -> ResponseBody {
|
||||
ResponseBody::new(self)
|
||||
}
|
||||
|
||||
// /// Return stream to http payload processes as multipart.
|
||||
// ///
|
||||
// /// Content-type: multipart/form-data;
|
||||
@ -221,6 +230,67 @@ impl Stream for ClientResponse {
|
||||
}
|
||||
}
|
||||
|
||||
/// Future that resolves to a complete response body.
|
||||
#[must_use = "ResponseBody does nothing unless polled"]
|
||||
pub struct ResponseBody {
|
||||
limit: usize,
|
||||
resp: Option<ClientResponse>,
|
||||
fut: Option<Box<Future<Item=Bytes, Error=PayloadError>>>,
|
||||
}
|
||||
|
||||
impl ResponseBody {
|
||||
|
||||
/// Create `ResponseBody` for request.
|
||||
pub fn new(resp: ClientResponse) -> Self {
|
||||
ResponseBody {
|
||||
limit: 262_144,
|
||||
resp: Some(resp),
|
||||
fut: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Change max size of payload. By default max size is 256Kb
|
||||
pub fn limit(mut self, limit: usize) -> Self {
|
||||
self.limit = limit;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for ResponseBody {
|
||||
type Item = Bytes;
|
||||
type Error = PayloadError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Bytes, PayloadError> {
|
||||
if let Some(resp) = self.resp.take() {
|
||||
if let Some(len) = resp.headers().get(header::CONTENT_LENGTH) {
|
||||
if let Ok(s) = len.to_str() {
|
||||
if let Ok(len) = s.parse::<usize>() {
|
||||
if len > self.limit {
|
||||
return Err(PayloadError::Overflow);
|
||||
}
|
||||
} else {
|
||||
return Err(PayloadError::Overflow);
|
||||
}
|
||||
}
|
||||
}
|
||||
let limit = self.limit;
|
||||
let fut = resp.from_err()
|
||||
.fold(BytesMut::new(), move |mut body, chunk| {
|
||||
if (body.len() + chunk.len()) > limit {
|
||||
Err(PayloadError::Overflow)
|
||||
} else {
|
||||
body.extend_from_slice(&chunk);
|
||||
Ok(body)
|
||||
}
|
||||
})
|
||||
.map(|bytes| bytes.freeze());
|
||||
self.fut = Some(Box::new(fut));
|
||||
}
|
||||
|
||||
self.fut.as_mut().expect("ResponseBody could not be used second time").poll()
|
||||
}
|
||||
}
|
||||
|
||||
/// Client response json parser that resolves to a deserialized `T` value.
|
||||
///
|
||||
/// Returns error:
|
||||
@ -237,7 +307,7 @@ pub struct JsonResponse<T: DeserializeOwned>{
|
||||
|
||||
impl<T: DeserializeOwned> JsonResponse<T> {
|
||||
|
||||
/// Create `JsonBody` for request.
|
||||
/// Create `JsonResponse` for request.
|
||||
pub fn from_response(resp: ClientResponse) -> Self {
|
||||
JsonResponse{
|
||||
limit: 262_144,
|
||||
|
@ -212,8 +212,10 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
|
||||
// TODO return error!
|
||||
let _ = enc.write(bytes.clone());
|
||||
let _ = enc.write_eof();
|
||||
|
||||
*bytes = Binary::from(tmp.take());
|
||||
|
||||
req.headers_mut().insert(
|
||||
CONTENT_ENCODING, HeaderValue::from_static(encoding.as_str()));
|
||||
encoding = ContentEncoding::Identity;
|
||||
}
|
||||
let mut b = BytesMut::new();
|
||||
@ -240,6 +242,11 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
|
||||
}
|
||||
};
|
||||
|
||||
if encoding.is_compression() {
|
||||
req.headers_mut().insert(
|
||||
CONTENT_ENCODING, HeaderValue::from_static(encoding.as_str()));
|
||||
}
|
||||
|
||||
req.replace_body(body);
|
||||
match encoding {
|
||||
ContentEncoding::Deflate => ContentEncoder::Deflate(
|
||||
|
Reference in New Issue
Block a user