1
0
mirror of https://github.com/actix/actix-extras.git synced 2025-01-24 07:43:49 +01:00
actix-extras/src/client/pipeline.rs

481 lines
15 KiB
Rust
Raw Normal View History

2018-02-19 03:11:11 -08:00
use bytes::{Bytes, BytesMut};
use futures::sync::oneshot;
2018-04-13 16:02:01 -07:00
use futures::{Async, Future, Poll};
use http::header::CONTENT_ENCODING;
2018-05-24 21:03:16 -07:00
use std::time::{Duration, Instant};
2018-04-13 16:02:01 -07:00
use std::{io, mem};
2018-05-24 21:03:16 -07:00
use tokio_timer::Delay;
2018-02-19 03:11:11 -08:00
2018-06-01 09:36:16 -07:00
use actix::{Addr, Request, SystemService};
2018-02-19 03:11:11 -08:00
use super::{
ClientBody, ClientBodyStream, ClientConnector, ClientConnectorError, ClientRequest,
ClientResponse, Connect, Connection, HttpClientWriter, HttpResponseParser,
HttpResponseParserError,
};
2018-04-13 16:02:01 -07:00
use error::Error;
use error::PayloadError;
2018-03-29 11:06:44 -07:00
use header::ContentEncoding;
use httpmessage::HttpMessage;
2018-02-25 11:43:00 +03:00
use server::encoding::PayloadStream;
2018-04-28 22:55:47 -07:00
use server::WriterState;
2018-02-19 03:11:11 -08:00
/// A set of errors that can occur during request sending and response reading
2018-02-19 13:18:18 -08:00
#[derive(Fail, Debug)]
2018-02-19 03:11:11 -08:00
pub enum SendRequestError {
2018-03-06 17:04:48 -08:00
/// Response took too long
#[fail(display = "Timeout while waiting for response")]
2018-03-06 17:04:48 -08:00
Timeout,
2018-02-19 13:18:18 -08:00
/// Failed to connect to host
2018-04-13 16:02:01 -07:00
#[fail(display = "Failed to connect to host: {}", _0)]
2018-02-19 13:18:18 -08:00
Connector(#[cause] ClientConnectorError),
/// Error parsing response
2018-04-13 16:02:01 -07:00
#[fail(display = "{}", _0)]
2018-02-19 13:18:18 -08:00
ParseError(#[cause] HttpResponseParserError),
/// Error reading response payload
2018-04-13 16:02:01 -07:00
#[fail(display = "Error reading response payload: {}", _0)]
2018-02-19 13:18:18 -08:00
Io(#[cause] io::Error),
2018-02-19 03:11:11 -08:00
}
impl From<io::Error> for SendRequestError {
fn from(err: io::Error) -> SendRequestError {
SendRequestError::Io(err)
}
}
2018-03-06 17:04:48 -08:00
impl From<ClientConnectorError> for SendRequestError {
fn from(err: ClientConnectorError) -> SendRequestError {
match err {
ClientConnectorError::Timeout => SendRequestError::Timeout,
_ => SendRequestError::Connector(err),
}
}
}
2018-02-19 03:11:11 -08:00
enum State {
New,
2018-06-01 09:36:16 -07:00
Connect(Request<ClientConnector, Connect>),
Connection(Connection),
2018-02-19 03:11:11 -08:00
Send(Box<Pipeline>),
None,
}
/// `SendRequest` is a `Future` which represents an asynchronous
/// request sending process.
2018-02-19 22:48:27 -08:00
#[must_use = "SendRequest does nothing unless polled"]
2018-02-19 03:11:11 -08:00
pub struct SendRequest {
req: ClientRequest,
state: State,
conn: Option<Addr<ClientConnector>>,
2018-03-06 17:04:48 -08:00
conn_timeout: Duration,
wait_timeout: Duration,
2018-05-24 21:03:16 -07:00
timeout: Option<Delay>,
2018-02-19 03:11:11 -08:00
}
impl SendRequest {
pub(crate) fn new(req: ClientRequest) -> SendRequest {
SendRequest {
req,
conn: None,
state: State::New,
timeout: None,
wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
}
}
2018-04-13 16:02:01 -07:00
pub(crate) fn with_connector(
req: ClientRequest, conn: Addr<ClientConnector>,
2018-04-13 16:02:01 -07:00
) -> SendRequest {
SendRequest {
req,
conn: Some(conn),
2018-04-13 16:02:01 -07:00
state: State::New,
timeout: None,
wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
2018-03-06 17:04:48 -08:00
}
2018-02-19 03:11:11 -08:00
}
2018-04-13 16:02:01 -07:00
pub(crate) fn with_connection(req: ClientRequest, conn: Connection) -> SendRequest {
SendRequest {
req,
state: State::Connection(conn),
conn: None,
2018-04-13 16:02:01 -07:00
timeout: None,
wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1),
2018-03-06 17:04:48 -08:00
}
}
/// Set request timeout
///
/// Request timeout is the total time before a response must be received.
2018-03-06 17:04:48 -08:00
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
2018-05-24 21:03:16 -07:00
self.timeout = Some(Delay::new(Instant::now() + timeout));
2018-03-06 17:04:48 -08:00
self
}
/// Set connection timeout
///
/// Connection timeout includes resolving hostname and actual connection to
/// the host.
/// Default value is 1 second.
pub fn conn_timeout(mut self, timeout: Duration) -> Self {
self.conn_timeout = timeout;
self
}
2018-04-05 18:33:58 -07:00
/// Set wait timeout
2018-04-05 18:33:58 -07:00
///
/// If connections pool limits are enabled, wait time indicates max time
/// to wait for available connection. Default value is 5 seconds.
pub fn wait_timeout(mut self, timeout: Duration) -> Self {
self.wait_timeout = timeout;
2018-04-05 18:33:58 -07:00
self
}
2018-02-19 03:11:11 -08:00
}
impl Future for SendRequest {
type Item = ClientResponse;
type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let state = mem::replace(&mut self.state, State::None);
match state {
2018-04-13 16:02:01 -07:00
State::New => {
let conn = if let Some(conn) = self.conn.take() {
conn
} else {
ClientConnector::from_registry()
};
self.state = State::Connect(conn.send(Connect {
2018-03-06 15:26:09 -07:00
uri: self.req.uri().clone(),
wait_timeout: self.wait_timeout,
2018-03-06 17:04:48 -08:00
conn_timeout: self.conn_timeout,
2018-04-13 16:02:01 -07:00
}))
}
2018-02-19 03:11:11 -08:00
State::Connect(mut conn) => match conn.poll() {
Ok(Async::NotReady) => {
self.state = State::Connect(conn);
return Ok(Async::NotReady);
2018-04-13 16:02:01 -07:00
}
2018-02-19 03:11:11 -08:00
Ok(Async::Ready(result)) => match result {
2018-04-13 16:02:01 -07:00
Ok(stream) => self.state = State::Connection(stream),
2018-03-06 17:04:48 -08:00
Err(err) => return Err(err.into()),
2018-02-19 03:11:11 -08:00
},
2018-04-13 16:02:01 -07:00
Err(_) => {
return Err(SendRequestError::Connector(
ClientConnectorError::Disconnected,
));
2018-04-13 16:02:01 -07:00
}
},
2018-02-26 14:33:56 -08:00
State::Connection(conn) => {
let mut writer = HttpClientWriter::new();
writer.start(&mut self.req)?;
let body = match self.req.replace_body(ClientBody::Empty) {
ClientBody::Streaming(stream) => IoBody::Payload(stream),
ClientBody::Actor(_) => panic!("Client actor is not supported"),
_ => IoBody::Done,
};
2018-04-13 16:02:01 -07:00
let timeout = self.timeout.take().unwrap_or_else(|| {
2018-05-24 21:03:16 -07:00
Delay::new(Instant::now() + Duration::from_secs(5))
2018-04-13 16:02:01 -07:00
});
2018-04-04 20:15:47 -07:00
2018-03-02 14:31:23 -08:00
let pl = Box::new(Pipeline {
2018-04-13 16:02:01 -07:00
body,
writer,
2018-03-16 12:04:01 -07:00
conn: Some(conn),
2018-02-24 07:29:35 +03:00
parser: Some(HttpResponseParser::default()),
parser_buf: BytesMut::new(),
disconnected: false,
body_completed: false,
drain: None,
2018-02-24 07:29:35 +03:00
decompress: None,
should_decompress: self.req.response_decompress(),
write_state: RunningState::Running,
2018-04-04 20:15:47 -07:00
timeout: Some(timeout),
});
self.state = State::Send(pl);
2018-04-13 16:02:01 -07:00
}
2018-02-19 03:11:11 -08:00
State::Send(mut pl) => {
2018-05-10 09:37:38 -07:00
pl.poll_timeout()?;
2018-04-13 16:02:01 -07:00
pl.poll_write().map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("{}", e).as_str())
})?;
2018-02-19 22:48:27 -08:00
2018-02-19 03:11:11 -08:00
match pl.parse() {
Ok(Async::Ready(mut resp)) => {
resp.set_pipeline(pl);
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(resp));
}
2018-02-19 03:11:11 -08:00
Ok(Async::NotReady) => {
self.state = State::Send(pl);
2018-04-13 16:02:01 -07:00
return Ok(Async::NotReady);
}
Err(err) => {
return Err(SendRequestError::ParseError(err));
}
2018-02-19 03:11:11 -08:00
}
}
State::None => unreachable!(),
}
}
}
}
pub(crate) struct Pipeline {
2018-02-19 22:48:27 -08:00
body: IoBody,
body_completed: bool,
2018-03-16 12:04:01 -07:00
conn: Option<Connection>,
2018-02-19 03:11:11 -08:00
writer: HttpClientWriter,
2018-02-24 07:29:35 +03:00
parser: Option<HttpResponseParser>,
2018-02-19 03:11:11 -08:00
parser_buf: BytesMut,
2018-02-19 22:48:27 -08:00
disconnected: bool,
drain: Option<oneshot::Sender<()>>,
2018-02-24 07:29:35 +03:00
decompress: Option<PayloadStream>,
should_decompress: bool,
write_state: RunningState,
2018-05-24 21:03:16 -07:00
timeout: Option<Delay>,
2018-02-19 22:48:27 -08:00
}
enum IoBody {
Payload(ClientBodyStream),
2018-02-19 22:48:27 -08:00
Done,
}
2018-02-24 07:29:35 +03:00
#[derive(Debug, PartialEq)]
2018-02-19 22:48:27 -08:00
enum RunningState {
Running,
Paused,
Done,
}
impl RunningState {
#[inline]
fn pause(&mut self) {
if *self != RunningState::Done {
*self = RunningState::Paused
}
}
#[inline]
fn resume(&mut self) {
if *self != RunningState::Done {
*self = RunningState::Running
}
}
2018-02-19 03:11:11 -08:00
}
impl Pipeline {
2018-03-16 12:04:01 -07:00
fn release_conn(&mut self) {
if let Some(conn) = self.conn.take() {
conn.release()
}
}
2018-02-19 03:11:11 -08:00
#[inline]
2018-03-16 12:04:01 -07:00
fn parse(&mut self) -> Poll<ClientResponse, HttpResponseParserError> {
if let Some(ref mut conn) = self.conn {
2018-05-17 12:20:20 -07:00
match self
.parser
2018-04-29 09:09:08 -07:00
.as_mut()
.unwrap()
.parse(conn, &mut self.parser_buf)
{
2018-03-16 12:04:01 -07:00
Ok(Async::Ready(resp)) => {
// check content-encoding
if self.should_decompress {
if let Some(enc) = resp.headers().get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
match ContentEncoding::from(enc) {
2018-04-13 16:02:01 -07:00
ContentEncoding::Auto
| ContentEncoding::Identity => (),
enc => {
self.decompress = Some(PayloadStream::new(enc))
}
2018-03-16 12:04:01 -07:00
}
2018-02-24 07:29:35 +03:00
}
}
}
2018-03-16 12:04:01 -07:00
Ok(Async::Ready(resp))
}
val => val,
2018-02-24 07:29:35 +03:00
}
2018-03-16 12:04:01 -07:00
} else {
Ok(Async::NotReady)
2018-02-24 07:29:35 +03:00
}
2018-02-19 03:11:11 -08:00
}
#[inline]
pub fn poll(&mut self) -> Poll<Option<Bytes>, PayloadError> {
2018-03-16 12:04:01 -07:00
if self.conn.is_none() {
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(None));
2018-03-16 12:04:01 -07:00
}
2018-04-13 16:02:01 -07:00
let conn: &mut Connection =
2018-04-29 09:09:08 -07:00
unsafe { &mut *(self.conn.as_mut().unwrap() as *mut _) };
2018-03-16 12:04:01 -07:00
2018-02-24 07:29:35 +03:00
let mut need_run = false;
// need write?
2018-05-17 12:20:20 -07:00
match self
.poll_write()
2018-02-24 07:29:35 +03:00
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("{}", e)))?
{
2018-04-04 20:15:47 -07:00
Async::NotReady => need_run = true,
Async::Ready(_) => {
2018-05-10 09:37:38 -07:00
self.poll_timeout().map_err(|e| {
2018-04-13 16:02:01 -07:00
io::Error::new(io::ErrorKind::Other, format!("{}", e))
})?;
2018-04-04 20:15:47 -07:00
}
2018-02-24 07:29:35 +03:00
}
// need read?
if self.parser.is_some() {
loop {
2018-05-17 12:20:20 -07:00
match self
.parser
2018-04-13 16:02:01 -07:00
.as_mut()
.unwrap()
2018-03-16 12:04:01 -07:00
.parse_payload(conn, &mut self.parser_buf)?
2018-02-24 07:29:35 +03:00
{
Async::Ready(Some(b)) => {
if let Some(ref mut decompress) = self.decompress {
match decompress.feed_data(b) {
Ok(Some(b)) => return Ok(Async::Ready(Some(b))),
Ok(None) => return Ok(Async::NotReady),
2018-04-13 16:02:01 -07:00
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock =>
{
continue
}
2018-02-24 07:29:35 +03:00
Err(err) => return Err(err.into()),
}
} else {
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(Some(b)));
2018-02-24 07:29:35 +03:00
}
2018-04-13 16:02:01 -07:00
}
2018-02-24 07:29:35 +03:00
Async::Ready(None) => {
let _ = self.parser.take();
2018-04-13 16:02:01 -07:00
break;
2018-02-24 07:29:35 +03:00
}
Async::NotReady => return Ok(Async::NotReady),
}
}
}
// eof
if let Some(mut decompress) = self.decompress.take() {
let res = decompress.feed_eof();
if let Some(b) = res? {
2018-03-16 12:04:01 -07:00
self.release_conn();
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(Some(b)));
2018-02-24 07:29:35 +03:00
}
}
if need_run {
Ok(Async::NotReady)
} else {
2018-03-16 12:04:01 -07:00
self.release_conn();
2018-02-24 07:29:35 +03:00
Ok(Async::Ready(None))
}
2018-02-19 03:11:11 -08:00
}
2018-05-10 09:37:38 -07:00
fn poll_timeout(&mut self) -> Result<(), SendRequestError> {
2018-04-04 20:15:47 -07:00
if self.timeout.is_some() {
match self.timeout.as_mut().unwrap().poll() {
2018-05-10 09:37:38 -07:00
Ok(Async::Ready(())) => return Err(SendRequestError::Timeout),
Ok(Async::NotReady) => (),
Err(_) => return Err(SendRequestError::Timeout),
2018-04-04 20:15:47 -07:00
}
}
2018-05-10 09:37:38 -07:00
Ok(())
2018-04-04 20:15:47 -07:00
}
2018-02-19 03:11:11 -08:00
#[inline]
2018-03-16 12:04:01 -07:00
fn poll_write(&mut self) -> Poll<(), Error> {
if self.write_state == RunningState::Done || self.conn.is_none() {
2018-04-13 16:02:01 -07:00
return Ok(Async::Ready(()));
2018-02-19 22:48:27 -08:00
}
let mut done = false;
2018-02-24 07:29:35 +03:00
if self.drain.is_none() && self.write_state != RunningState::Paused {
loop {
2018-02-19 22:48:27 -08:00
let result = match mem::replace(&mut self.body, IoBody::Done) {
IoBody::Payload(mut stream) => match stream.poll()? {
2018-04-13 16:02:01 -07:00
Async::Ready(None) => {
self.writer.write_eof()?;
self.body_completed = true;
2018-04-13 16:02:01 -07:00
break;
}
Async::Ready(Some(chunk)) => {
self.body = IoBody::Payload(stream);
self.writer.write(chunk.as_ref())?
2018-04-13 16:02:01 -07:00
}
Async::NotReady => {
done = true;
self.body = IoBody::Payload(stream);
2018-04-13 16:02:01 -07:00
break;
2018-02-19 22:48:27 -08:00
}
},
IoBody::Done => {
self.body_completed = true;
2018-02-19 22:48:27 -08:00
done = true;
2018-04-13 16:02:01 -07:00
break;
2018-02-19 22:48:27 -08:00
}
};
match result {
WriterState::Pause => {
2018-02-24 07:29:35 +03:00
self.write_state.pause();
2018-04-13 16:02:01 -07:00
break;
2018-02-19 22:48:27 -08:00
}
2018-04-13 16:02:01 -07:00
WriterState::Done => self.write_state.resume(),
2018-02-19 22:48:27 -08:00
}
}
}
// flush io but only if we need to
2018-05-17 12:20:20 -07:00
match self
.writer
2018-04-29 09:09:08 -07:00
.poll_completed(self.conn.as_mut().unwrap(), false)
{
2018-02-19 22:48:27 -08:00
Ok(Async::Ready(_)) => {
if self.disconnected
|| (self.body_completed && self.writer.is_completed())
{
2018-02-24 07:29:35 +03:00
self.write_state = RunningState::Done;
} else {
self.write_state.resume();
}
2018-02-19 22:48:27 -08:00
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// restart io processing
2018-02-24 07:29:35 +03:00
if !done || self.write_state == RunningState::Done {
2018-02-19 22:48:27 -08:00
self.poll_write()
} else {
Ok(Async::NotReady)
}
2018-04-13 16:02:01 -07:00
}
2018-02-19 22:48:27 -08:00
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err.into()),
}
2018-02-19 03:11:11 -08:00
}
}
2018-04-04 16:39:01 -07:00
impl Drop for Pipeline {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
conn.close()
}
}
}