1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-06-26 02:19:22 +02:00

refactor TransferEncoding; allow to use client api with threaded tokio runtime

This commit is contained in:
Nikolay Kim
2018-05-29 16:32:39 -07:00
parent 844be8d9dd
commit a64205e502
18 changed files with 344 additions and 290 deletions

94
src/client/body.rs Normal file
View File

@ -0,0 +1,94 @@
use std::fmt;
use bytes::Bytes;
use futures::Stream;
use body::Binary;
use context::ActorHttpContext;
use error::Error;
/// Type represent streaming body
pub type ClientBodyStream = Box<Stream<Item = Bytes, Error = Error> + Send>;
/// Represents various types of http message body.
pub enum ClientBody {
/// Empty response. `Content-Length` header is set to `0`
Empty,
/// Specific response body.
Binary(Binary),
/// Unspecified streaming response. Developer is responsible for setting
/// right `Content-Length` or `Transfer-Encoding` headers.
Streaming(ClientBodyStream),
/// Special body type for actor response.
Actor(Box<ActorHttpContext + Send>),
}
impl ClientBody {
/// Does this body streaming.
#[inline]
pub fn is_streaming(&self) -> bool {
match *self {
ClientBody::Streaming(_) | ClientBody::Actor(_) => true,
_ => false,
}
}
/// Is this binary body.
#[inline]
pub fn is_binary(&self) -> bool {
match *self {
ClientBody::Binary(_) => true,
_ => false,
}
}
/// Is this binary empy.
#[inline]
pub fn is_empty(&self) -> bool {
match *self {
ClientBody::Empty => true,
_ => false,
}
}
/// Create body from slice (copy)
pub fn from_slice(s: &[u8]) -> ClientBody {
ClientBody::Binary(Binary::Bytes(Bytes::from(s)))
}
}
impl PartialEq for ClientBody {
fn eq(&self, other: &ClientBody) -> bool {
match *self {
ClientBody::Empty => match *other {
ClientBody::Empty => true,
_ => false,
},
ClientBody::Binary(ref b) => match *other {
ClientBody::Binary(ref b2) => b == b2,
_ => false,
},
ClientBody::Streaming(_) | ClientBody::Actor(_) => false,
}
}
}
impl fmt::Debug for ClientBody {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ClientBody::Empty => write!(f, "ClientBody::Empty"),
ClientBody::Binary(ref b) => write!(f, "ClientBody::Binary({:?})", b),
ClientBody::Streaming(_) => write!(f, "ClientBody::Streaming(_)"),
ClientBody::Actor(_) => write!(f, "ClientBody::Actor(_)"),
}
}
}
impl<T> From<T> for ClientBody
where
T: Into<Binary>,
{
fn from(b: T) -> ClientBody {
ClientBody::Binary(b.into())
}
}

View File

@ -1,11 +1,12 @@
//! Http client api
//!
//! ```rust,ignore
//! ```rust
//! # extern crate actix;
//! # extern crate actix_web;
//! # extern crate futures;
//! # extern crate tokio;
//! # use futures::Future;
//! # use std::process;
//! use actix_web::client;
//!
//! fn main() {
@ -17,11 +18,14 @@
//! .map_err(|_| ())
//! .and_then(|response| { // <- server http response
//! println!("Response: {:?}", response);
//! # process::exit(0);
//! Ok(())
//! })
//! });
//! }
//! ```
mod body;
mod connector;
mod parser;
mod pipeline;
@ -29,6 +33,7 @@ mod request;
mod response;
mod writer;
pub use self::body::{ClientBody, ClientBodyStream};
pub use self::connector::{
ClientConnector, ClientConnectorError, ClientConnectorStats, Connect, Connection,
Pause, Resume,
@ -56,11 +61,15 @@ impl ResponseError for SendRequestError {
/// Create request builder for `GET` requests
///
/// ```rust,ignore
///
/// ```rust
/// # extern crate actix;
/// # extern crate actix_web;
/// # extern crate futures;
/// # use futures::{future, Future};
/// # extern crate tokio;
/// # extern crate env_logger;
/// # use futures::Future;
/// # use std::process;
/// use actix_web::client;
///
/// fn main() {
@ -72,6 +81,7 @@ impl ResponseError for SendRequestError {
/// .map_err(|_| ())
/// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response);
/// # process::exit(0);
/// Ok(())
/// }));
/// }

View File

@ -1,5 +1,5 @@
use bytes::{Bytes, BytesMut};
use futures::unsync::oneshot;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use http::header::CONTENT_ENCODING;
use std::time::{Duration, Instant};
@ -8,18 +8,16 @@ use tokio_timer::Delay;
use actix::prelude::*;
use super::HttpClientWriter;
use super::{ClientConnector, ClientConnectorError, Connect, Connection};
use super::{ClientRequest, ClientResponse};
use super::{HttpResponseParser, HttpResponseParserError};
use body::{Body, BodyStream};
use context::{ActorHttpContext, Frame};
use super::{
ClientBody, ClientBodyStream, ClientConnector, ClientConnectorError, ClientRequest,
ClientResponse, Connect, Connection, HttpClientWriter, HttpResponseParser,
HttpResponseParserError,
};
use error::Error;
use error::PayloadError;
use header::ContentEncoding;
use httpmessage::HttpMessage;
use server::encoding::PayloadStream;
use server::shared::SharedBytes;
use server::WriterState;
/// A set of errors that can occur during request sending and response reading
@ -68,7 +66,7 @@ enum State {
pub struct SendRequest {
req: ClientRequest,
state: State,
conn: Addr<ClientConnector>,
conn: Option<Addr<ClientConnector>>,
conn_timeout: Duration,
wait_timeout: Duration,
timeout: Option<Delay>,
@ -76,7 +74,14 @@ pub struct SendRequest {
impl SendRequest {
pub(crate) fn new(req: ClientRequest) -> SendRequest {
SendRequest::with_connector(req, ClientConnector::from_registry())
SendRequest {
req,
conn: None,
state: State::New,
timeout: None,
wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
}
}
pub(crate) fn with_connector(
@ -84,7 +89,7 @@ impl SendRequest {
) -> SendRequest {
SendRequest {
req,
conn,
conn: Some(conn),
state: State::New,
timeout: None,
wait_timeout: Duration::from_secs(5),
@ -96,7 +101,7 @@ impl SendRequest {
SendRequest {
req,
state: State::Connection(conn),
conn: ClientConnector::from_registry(),
conn: None,
timeout: None,
wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
@ -142,7 +147,12 @@ impl Future for SendRequest {
match state {
State::New => {
self.state = State::Connect(self.conn.send(Connect {
let conn = if let Some(conn) = self.conn.take() {
conn
} else {
ClientConnector::from_registry()
};
self.state = State::Connect(conn.send(Connect {
uri: self.req.uri().clone(),
wait_timeout: self.wait_timeout,
conn_timeout: self.conn_timeout,
@ -160,16 +170,16 @@ impl Future for SendRequest {
Err(_) => {
return Err(SendRequestError::Connector(
ClientConnectorError::Disconnected,
))
));
}
},
State::Connection(conn) => {
let mut writer = HttpClientWriter::new(SharedBytes::default());
let mut writer = HttpClientWriter::new();
writer.start(&mut self.req)?;
let body = match self.req.replace_body(Body::Empty) {
Body::Streaming(stream) => IoBody::Payload(stream),
Body::Actor(ctx) => IoBody::Actor(ctx),
let body = match self.req.replace_body(ClientBody::Empty) {
ClientBody::Streaming(stream) => IoBody::Payload(stream),
ClientBody::Actor(_) => panic!("Client actor is not supported"),
_ => IoBody::Done,
};
@ -208,7 +218,9 @@ impl Future for SendRequest {
self.state = State::Send(pl);
return Ok(Async::NotReady);
}
Err(err) => return Err(SendRequestError::ParseError(err)),
Err(err) => {
return Err(SendRequestError::ParseError(err));
}
}
}
State::None => unreachable!(),
@ -233,8 +245,7 @@ pub(crate) struct Pipeline {
}
enum IoBody {
Payload(BodyStream),
Actor(Box<ActorHttpContext>),
Payload(ClientBodyStream),
Done,
}
@ -380,10 +391,7 @@ impl Pipeline {
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => return Err(SendRequestError::Timeout),
Ok(Async::NotReady) => (),
Err(e) => {
println!("err: {:?}", e);
return Err(SendRequestError::Timeout);
}
Err(_) => return Err(SendRequestError::Timeout),
}
}
Ok(())
@ -397,66 +405,24 @@ impl Pipeline {
let mut done = false;
if self.drain.is_none() && self.write_state != RunningState::Paused {
'outter: loop {
loop {
let result = match mem::replace(&mut self.body, IoBody::Done) {
IoBody::Payload(mut body) => match body.poll()? {
IoBody::Payload(mut stream) => match stream.poll()? {
Async::Ready(None) => {
self.writer.write_eof()?;
self.body_completed = true;
break;
}
Async::Ready(Some(chunk)) => {
self.body = IoBody::Payload(body);
self.writer.write(chunk.into())?
self.body = IoBody::Payload(stream);
self.writer.write(chunk.as_ref())?
}
Async::NotReady => {
done = true;
self.body = IoBody::Payload(body);
self.body = IoBody::Payload(stream);
break;
}
},
IoBody::Actor(mut ctx) => {
if self.disconnected {
ctx.disconnected();
}
match ctx.poll()? {
Async::Ready(Some(vec)) => {
if vec.is_empty() {
self.body = IoBody::Actor(ctx);
break;
}
let mut res = None;
for frame in vec {
match frame {
Frame::Chunk(None) => {
self.body_completed = true;
self.writer.write_eof()?;
break 'outter;
}
Frame::Chunk(Some(chunk)) => {
res = Some(self.writer.write(chunk)?)
}
Frame::Drain(fut) => self.drain = Some(fut),
}
}
self.body = IoBody::Actor(ctx);
if self.drain.is_some() {
self.write_state.resume();
break;
}
res.unwrap()
}
Async::Ready(None) => {
done = true;
break;
}
Async::NotReady => {
done = true;
self.body = IoBody::Actor(ctx);
break;
}
}
}
IoBody::Done => {
self.body_completed = true;
done = true;

View File

@ -12,9 +12,9 @@ use serde::Serialize;
use serde_json;
use url::Url;
use super::body::ClientBody;
use super::connector::{ClientConnector, Connection};
use super::pipeline::SendRequest;
use body::Body;
use error::Error;
use header::{ContentEncoding, Header, IntoHeaderValue};
use http::header::{self, HeaderName, HeaderValue};
@ -24,11 +24,13 @@ use httprequest::HttpRequest;
/// An HTTP Client Request
///
/// ```rust,ignore
/// ```rust
/// # extern crate actix;
/// # extern crate actix_web;
/// # extern crate futures;
/// # extern crate tokio;
/// # use futures::Future;
/// # use std::process;
/// use actix_web::client::ClientRequest;
///
/// fn main() {
@ -40,6 +42,7 @@ use httprequest::HttpRequest;
/// .map_err(|_| ())
/// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response);
/// # process::exit(0);
/// Ok(())
/// })
/// );
@ -50,7 +53,7 @@ pub struct ClientRequest {
method: Method,
version: Version,
headers: HeaderMap,
body: Body,
body: ClientBody,
chunked: bool,
upgrade: bool,
timeout: Option<Duration>,
@ -73,7 +76,7 @@ impl Default for ClientRequest {
method: Method::default(),
version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16),
body: Body::Empty,
body: ClientBody::Empty,
chunked: false,
upgrade: false,
timeout: None,
@ -217,17 +220,17 @@ impl ClientRequest {
/// Get body of this response
#[inline]
pub fn body(&self) -> &Body {
pub fn body(&self) -> &ClientBody {
&self.body
}
/// Set a body
pub fn set_body<B: Into<Body>>(&mut self, body: B) {
pub fn set_body<B: Into<ClientBody>>(&mut self, body: B) {
self.body = body.into();
}
/// Extract body, replace it with `Empty`
pub(crate) fn replace_body(&mut self, body: Body) -> Body {
pub(crate) fn replace_body(&mut self, body: ClientBody) -> ClientBody {
mem::replace(&mut self.body, body)
}
@ -578,7 +581,9 @@ impl ClientRequestBuilder {
/// Set a body and generate `ClientRequest`.
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<ClientRequest, Error> {
pub fn body<B: Into<ClientBody>>(
&mut self, body: B,
) -> Result<ClientRequest, Error> {
if let Some(e) = self.err.take() {
return Err(e.into());
}
@ -644,17 +649,19 @@ impl ClientRequestBuilder {
/// `ClientRequestBuilder` can not be used after this call.
pub fn streaming<S, E>(&mut self, stream: S) -> Result<ClientRequest, Error>
where
S: Stream<Item = Bytes, Error = E> + 'static,
S: Stream<Item = Bytes, Error = E> + Send + 'static,
E: Into<Error>,
{
self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into()))))
self.body(ClientBody::Streaming(Box::new(
stream.map_err(|e| e.into()),
)))
}
/// Set an empty body and generate `ClientRequest`
///
/// `ClientRequestBuilder` can not be used after this call.
pub fn finish(&mut self) -> Result<ClientRequest, Error> {
self.body(Body::Empty)
self.body(ClientBody::Empty)
}
/// This method construct new `ClientRequestBuilder`

View File

@ -19,13 +19,12 @@ use http::{HttpTryFrom, Version};
use time::{self, Duration};
use tokio_io::AsyncWrite;
use body::{Binary, Body};
use body::Binary;
use header::ContentEncoding;
use server::encoding::{ContentEncoder, TransferEncoding};
use server::shared::SharedBytes;
use server::WriterState;
use client::ClientRequest;
use client::{ClientBody, ClientRequest};
const AVERAGE_HEADER_SIZE: usize = 30;
@ -42,20 +41,20 @@ pub(crate) struct HttpClientWriter {
flags: Flags,
written: u64,
headers_size: u32,
buffer: SharedBytes,
buffer: Box<BytesMut>,
buffer_capacity: usize,
encoder: ContentEncoder,
}
impl HttpClientWriter {
pub fn new(buffer: SharedBytes) -> HttpClientWriter {
let encoder = ContentEncoder::Identity(TransferEncoding::eof(buffer.clone()));
pub fn new() -> HttpClientWriter {
let encoder = ContentEncoder::Identity(TransferEncoding::eof());
HttpClientWriter {
flags: Flags::empty(),
written: 0,
headers_size: 0,
buffer_capacity: 0,
buffer,
buffer: Box::new(BytesMut::new()),
encoder,
}
}
@ -98,12 +97,23 @@ impl HttpClientWriter {
}
}
pub struct Writer<'a>(pub &'a mut BytesMut);
impl<'a> io::Write for Writer<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl HttpClientWriter {
pub fn start(&mut self, msg: &mut ClientRequest) -> io::Result<()> {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = content_encoder(self.buffer.clone(), msg);
self.encoder = content_encoder(self.buffer.as_mut(), msg);
if msg.upgrade() {
self.flags.insert(Flags::UPGRADE);
}
@ -112,7 +122,7 @@ impl HttpClientWriter {
{
// status line
writeln!(
self.buffer,
Writer(&mut self.buffer),
"{} {} {:?}\r",
msg.method(),
msg.uri()
@ -120,40 +130,41 @@ impl HttpClientWriter {
.map(|u| u.as_str())
.unwrap_or("/"),
msg.version()
)?;
).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// write headers
let mut buffer = self.buffer.get_mut();
if let Body::Binary(ref bytes) = *msg.body() {
buffer.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
if let ClientBody::Binary(ref bytes) = *msg.body() {
self.buffer
.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
buffer.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE);
self.buffer
.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE);
}
for (key, value) in msg.headers() {
let v = value.as_ref();
let k = key.as_str().as_bytes();
buffer.reserve(k.len() + v.len() + 4);
buffer.put_slice(k);
buffer.put_slice(b": ");
buffer.put_slice(v);
buffer.put_slice(b"\r\n");
self.buffer.reserve(k.len() + v.len() + 4);
self.buffer.put_slice(k);
self.buffer.put_slice(b": ");
self.buffer.put_slice(v);
self.buffer.put_slice(b"\r\n");
}
// set date header
if !msg.headers().contains_key(DATE) {
buffer.extend_from_slice(b"date: ");
set_date(&mut buffer);
buffer.extend_from_slice(b"\r\n\r\n");
self.buffer.extend_from_slice(b"date: ");
set_date(&mut self.buffer);
self.buffer.extend_from_slice(b"\r\n\r\n");
} else {
buffer.extend_from_slice(b"\r\n");
self.buffer.extend_from_slice(b"\r\n");
}
self.headers_size = buffer.len() as u32;
self.headers_size = self.buffer.len() as u32;
if msg.body().is_binary() {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
if let ClientBody::Binary(bytes) = msg.replace_body(ClientBody::Empty) {
self.written += bytes.len() as u64;
self.encoder.write(bytes)?;
self.encoder.write(bytes.as_ref())?;
}
} else {
self.buffer_capacity = msg.write_buffer_capacity();
@ -162,7 +173,7 @@ impl HttpClientWriter {
Ok(())
}
pub fn write(&mut self, payload: Binary) -> io::Result<WriterState> {
pub fn write(&mut self, payload: &[u8]) -> io::Result<WriterState> {
self.written += payload.len() as u64;
if !self.flags.contains(Flags::DISCONNECTED) {
if self.flags.contains(Flags::UPGRADE) {
@ -210,20 +221,21 @@ impl HttpClientWriter {
}
}
fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder {
fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncoder {
let version = req.version();
let mut body = req.replace_body(Body::Empty);
let mut body = req.replace_body(ClientBody::Empty);
let mut encoding = req.content_encoding();
let transfer = match body {
Body::Empty => {
let mut transfer = match body {
ClientBody::Empty => {
req.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::length(0, buf)
TransferEncoding::length(0)
}
Body::Binary(ref mut bytes) => {
ClientBody::Binary(ref mut bytes) => {
if encoding.is_compression() {
let tmp = SharedBytes::default();
let transfer = TransferEncoding::eof(tmp.clone());
let mut tmp = BytesMut::new();
let mut transfer = TransferEncoding::eof();
transfer.set_buffer(&mut tmp);
let mut enc = match encoding {
#[cfg(feature = "flate2")]
ContentEncoding::Deflate => ContentEncoder::Deflate(
@ -242,7 +254,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
ContentEncoding::Auto => unreachable!(),
};
// TODO return error!
let _ = enc.write(bytes.clone());
let _ = enc.write(bytes.as_ref());
let _ = enc.write_eof();
*bytes = Binary::from(tmp.take());
@ -256,9 +268,9 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
let _ = write!(b, "{}", bytes.len());
req.headers_mut()
.insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
TransferEncoding::eof(buf)
TransferEncoding::eof()
}
Body::Streaming(_) | Body::Actor(_) => {
ClientBody::Streaming(_) | ClientBody::Actor(_) => {
if req.upgrade() {
if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2");
@ -270,9 +282,9 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
encoding = ContentEncoding::Identity;
req.headers_mut().remove(CONTENT_ENCODING);
}
TransferEncoding::eof(buf)
TransferEncoding::eof()
} else {
streaming_encoding(buf, version, req)
streaming_encoding(version, req)
}
}
};
@ -283,6 +295,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
HeaderValue::from_static(encoding.as_str()),
);
}
transfer.set_buffer(buf);
req.replace_body(body);
match encoding {
@ -303,19 +316,17 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder
}
}
fn streaming_encoding(
buf: SharedBytes, version: Version, req: &mut ClientRequest,
) -> TransferEncoding {
fn streaming_encoding(version: Version, req: &mut ClientRequest) -> TransferEncoding {
if req.chunked() {
// Enable transfer encoding
req.headers_mut().remove(CONTENT_LENGTH);
if version == Version::HTTP_2 {
req.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
TransferEncoding::eof()
} else {
req.headers_mut()
.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf)
TransferEncoding::chunked()
}
} else {
// if Content-Length is specified, then use it as length hint
@ -338,9 +349,9 @@ fn streaming_encoding(
if !chunked {
if let Some(len) = len {
TransferEncoding::length(len, buf)
TransferEncoding::length(len)
} else {
TransferEncoding::eof(buf)
TransferEncoding::eof()
}
} else {
// Enable transfer encoding
@ -348,11 +359,11 @@ fn streaming_encoding(
Version::HTTP_11 => {
req.headers_mut()
.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked(buf)
TransferEncoding::chunked()
}
_ => {
req.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof(buf)
TransferEncoding::eof()
}
}
}