diff --git a/CHANGES.md b/CHANGES.md index 16bba10fe..cfb0f3e04 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,7 +11,8 @@ * Add `ResponseError` impl for `SendRequestError`. This improves ergonomics of http client. -* Allow connection timeout to be set +* Allow client connection timeout to be set #108 + ## 0.4.4 (2018-03-04) diff --git a/src/client/connector.rs b/src/client/connector.rs index eb9dc3b0d..1717742c0 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -28,7 +28,7 @@ use server::IoStream; /// with connection request. pub struct Connect { pub uri: Uri, - pub connection_timeout: Duration + pub conn_timeout: Duration, } impl Connect { @@ -36,7 +36,7 @@ impl Connect { pub fn new(uri: U) -> Result where Uri: HttpTryFrom { Ok(Connect { uri: Uri::try_from(uri).map_err(|e| e.into())?, - connection_timeout: Duration::from_secs(1) + conn_timeout: Duration::from_secs(1) }) } } @@ -167,7 +167,7 @@ impl Handler for ClientConnector { fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { let uri = &msg.uri; - let connection_timeout = msg.connection_timeout; + let conn_timeout = msg.conn_timeout; // host name is required if uri.host().is_none() { @@ -193,7 +193,8 @@ impl Handler for ClientConnector { ActorResponse::async( Connector::from_registry() - .send(ResolveConnect::host_and_port(&host, port).timeout(connection_timeout)) + .send(ResolveConnect::host_and_port(&host, port) + .timeout(conn_timeout)) .into_actor(self) .map_err(|_, _, _| ClientConnectorError::Disconnected) .and_then(move |res, _act, _| { diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 47e38bc8a..f17420818 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -1,8 +1,10 @@ use std::{io, mem}; +use std::time::Duration; use bytes::{Bytes, BytesMut}; use http::header::CONTENT_ENCODING; use futures::{Async, Future, Poll}; use futures::unsync::oneshot; +use tokio_core::reactor::Timeout; use actix::prelude::*; @@ -23,6 +25,9 @@ use super::{HttpResponseParser, HttpResponseParserError}; /// A set of errors that can occur during sending request and reading response #[derive(Fail, Debug)] pub enum SendRequestError { + /// Response took too long + #[fail(display = "Timeout out while waiting for response")] + Timeout, /// Failed to connect to host #[fail(display="Failed to connect to host: {}", _0)] Connector(#[cause] ClientConnectorError), @@ -40,6 +45,15 @@ impl From for SendRequestError { } } +impl From for SendRequestError { + fn from(err: ClientConnectorError) -> SendRequestError { + match err { + ClientConnectorError::Timeout => SendRequestError::Timeout, + _ => SendRequestError::Connector(err), + } + } +} + enum State { New, Connect(actix::dev::Request), @@ -54,6 +68,8 @@ pub struct SendRequest { req: ClientRequest, state: State, conn: Addr, + conn_timeout: Duration, + timeout: Option, } impl SendRequest { @@ -64,15 +80,53 @@ impl SendRequest { pub(crate) fn with_connector(req: ClientRequest, conn: Addr) -> SendRequest { - SendRequest{req, conn, state: State::New} + SendRequest{req, conn, + state: State::New, + timeout: None, + conn_timeout: Duration::from_secs(1) + } } pub(crate) fn with_connection(req: ClientRequest, conn: Connection) -> SendRequest { - SendRequest{ - req, - state: State::Connection(conn), - conn: ClientConnector::from_registry()} + SendRequest{req, + state: State::Connection(conn), + conn: ClientConnector::from_registry(), + timeout: None, + conn_timeout: Duration::from_secs(1), + } + } + + /// Set request timeout + /// + /// Request timeout is a total time before response should be received. + /// Default value is 5 seconds. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(Timeout::new(timeout, Arbiter::handle()).unwrap()); + self + } + + /// Set connection timeout + /// + /// Connection timeout includes resolving hostname and actual connection to + /// the host. + /// Default value is 1 second. + pub fn conn_timeout(mut self, timeout: Duration) -> Self { + self.conn_timeout = timeout; + self + } + + fn poll_timeout(&mut self) -> Poll<(), SendRequestError> { + if self.timeout.is_none() { + self.timeout = Some(Timeout::new( + Duration::from_secs(5), Arbiter::handle()).unwrap()); + } + + match self.timeout.as_mut().unwrap().poll() { + Ok(Async::Ready(())) => Err(SendRequestError::Timeout), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => unreachable!() + } } } @@ -81,6 +135,8 @@ impl Future for SendRequest { type Error = SendRequestError; fn poll(&mut self) -> Poll { + self.poll_timeout()?; + loop { let state = mem::replace(&mut self.state, State::None); @@ -88,7 +144,7 @@ impl Future for SendRequest { State::New => self.state = State::Connect(self.conn.send(Connect { uri: self.req.uri().clone(), - connection_timeout: self.req.connection_timeout() + conn_timeout: self.conn_timeout, })), State::Connect(mut conn) => match conn.poll() { Ok(Async::NotReady) => { @@ -99,7 +155,7 @@ impl Future for SendRequest { Ok(stream) => { self.state = State::Connection(stream) }, - Err(err) => return Err(SendRequestError::Connector(err)), + Err(err) => return Err(err.into()), }, Err(_) => return Err(SendRequestError::Connector( ClientConnectorError::Disconnected)) diff --git a/src/client/request.rs b/src/client/request.rs index d7ab2d53f..38881318d 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -1,6 +1,5 @@ use std::{fmt, mem}; use std::io::Write; -use std::time::Duration; use actix::{Addr, Unsync}; use cookie::{Cookie, CookieJar}; @@ -29,8 +28,6 @@ pub struct ClientRequest { response_decompress: bool, buffer_capacity: Option<(usize, usize)>, conn: ConnectionType, - connection_timeout: Duration - } enum ConnectionType { @@ -54,7 +51,6 @@ impl Default for ClientRequest { response_decompress: true, buffer_capacity: None, conn: ConnectionType::Default, - connection_timeout: Duration::from_secs(1) } } } @@ -115,11 +111,6 @@ impl ClientRequest { &self.uri } - #[inline] - pub fn connection_timeout(&self) -> Duration { - self.connection_timeout - } - /// Set client request uri #[inline] pub fn set_uri(&mut self, uri: Uri) { @@ -406,15 +397,6 @@ impl ClientRequestBuilder { self } - /// Set connection timeout - #[inline] - pub fn connection_timeout(&mut self, connection_timeout: Duration) -> &mut Self { - if let Some(ref mut request) = self.request { - request.connection_timeout = connection_timeout; - } - self - } - /// Set request's content type #[inline] pub fn content_type(&mut self, value: V) -> &mut Self