From 1e7096a63a5f07d41fdb10dacca7376d8833b03a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 28 Mar 2019 22:33:41 -0700 Subject: [PATCH] add request timeout --- actix-http/src/client/error.rs | 3 +++ awc/CHANGES.md | 8 +++--- awc/Cargo.toml | 1 + awc/src/builder.rs | 46 +++++++++++++++++++++++----------- awc/src/lib.rs | 28 ++++++++++----------- awc/src/request.rs | 29 +++++++++++++-------- awc/src/ws.rs | 28 +++++++++++++++------ awc/tests/test_client.rs | 26 +++++++++++++++++++ 8 files changed, 119 insertions(+), 50 deletions(-) diff --git a/actix-http/src/client/error.rs b/actix-http/src/client/error.rs index e67db5462..fc4b5b72b 100644 --- a/actix-http/src/client/error.rs +++ b/actix-http/src/client/error.rs @@ -105,6 +105,9 @@ pub enum SendRequestError { /// Http2 error #[display(fmt = "{}", _0)] H2(h2::Error), + /// Response took too long + #[display(fmt = "Timeout out while waiting for response")] + Timeout, /// Tunnels are not supported for http2 connection #[display(fmt = "Tunnels are not supported for http2 connection")] TunnelNotSupported, diff --git a/awc/CHANGES.md b/awc/CHANGES.md index b24ae50b2..9192e2db4 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -4,11 +4,13 @@ ### Added -* Re-export `actix_http::client::Connector` +* Request timeout. -* Session wide headers +* Re-export `actix_http::client::Connector`. -* Session wide basic and bearer auth +* Session wide headers. + +* Session wide basic and bearer auth. ## [0.1.0-alpha.1] - 2019-03-28 diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 69c9e9831..cd94057fb 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -52,6 +52,7 @@ rand = "0.6" serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.5.3" +tokio-timer = "0.2.8" cookie = { version="0.11", features=["percent-encode"], optional = true } openssl = { version="0.10", optional = true } diff --git a/awc/src/builder.rs b/awc/src/builder.rs index d53d0d442..6ef145bf8 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -1,12 +1,13 @@ use std::cell::RefCell; use std::fmt; use std::rc::Rc; +use std::time::Duration; use actix_http::client::{ConnectError, Connection, Connector}; use actix_http::http::{header, HeaderMap, HeaderName, HttpTryFrom, Uri}; use actix_service::Service; -use crate::connect::{Connect, ConnectorWrapper}; +use crate::connect::ConnectorWrapper; use crate::{Client, ClientConfig}; /// An HTTP Client builder @@ -14,11 +15,10 @@ use crate::{Client, ClientConfig}; /// This type can be used to construct an instance of `Client` through a /// builder-like pattern. pub struct ClientBuilder { - connector: Rc>, + config: ClientConfig, default_headers: bool, allow_redirects: bool, max_redirects: usize, - headers: HeaderMap, } impl ClientBuilder { @@ -27,10 +27,13 @@ impl ClientBuilder { default_headers: true, allow_redirects: true, max_redirects: 10, - headers: HeaderMap::new(), - connector: Rc::new(RefCell::new(ConnectorWrapper( - Connector::new().service(), - ))), + config: ClientConfig { + headers: HeaderMap::new(), + timeout: Some(Duration::from_secs(5)), + connector: RefCell::new(Box::new(ConnectorWrapper( + Connector::new().service(), + ))), + }, } } @@ -42,7 +45,22 @@ impl ClientBuilder { ::Future: 'static, T::Future: 'static, { - self.connector = Rc::new(RefCell::new(ConnectorWrapper(connector))); + self.config.connector = RefCell::new(Box::new(ConnectorWrapper(connector))); + self + } + + /// Set request timeout + /// + /// Request timeout is the total time before a response must be received. + /// Default value is 5 seconds. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.config.timeout = Some(timeout); + self + } + + /// Disable request timeout. + pub fn disable_timeout(mut self) -> Self { + self.config.timeout = None; self } @@ -81,7 +99,7 @@ impl ClientBuilder { match HeaderName::try_from(key) { Ok(key) => match value.try_into() { Ok(value) => { - self.headers.append(key, value); + self.config.headers.append(key, value); } Err(e) => log::error!("Header value error: {:?}", e), }, @@ -115,12 +133,7 @@ impl ClientBuilder { /// Finish build process and create `Client` instance. pub fn finish(self) -> Client { - Client { - connector: self.connector, - config: Rc::new(ClientConfig { - headers: self.headers, - }), - } + Client(Rc::new(self.config)) } } @@ -135,6 +148,7 @@ mod tests { let client = ClientBuilder::new().basic_auth("username", Some("password")); assert_eq!( client + .config .headers .get(header::AUTHORIZATION) .unwrap() @@ -146,6 +160,7 @@ mod tests { let client = ClientBuilder::new().basic_auth("username", None); assert_eq!( client + .config .headers .get(header::AUTHORIZATION) .unwrap() @@ -162,6 +177,7 @@ mod tests { let client = ClientBuilder::new().bearer_auth("someS3cr3tAutht0k3n"); assert_eq!( client + .config .headers .get(header::AUTHORIZATION) .unwrap() diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 3518bf8b4..9a8daeb43 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -22,6 +22,7 @@ //! ``` use std::cell::RefCell; use std::rc::Rc; +use std::time::Duration; pub use actix_http::{client::Connector, http}; @@ -66,25 +67,23 @@ use self::connect::{Connect, ConnectorWrapper}; /// } /// ``` #[derive(Clone)] -pub struct Client { - pub(crate) connector: Rc>, - pub(crate) config: Rc, -} +pub struct Client(Rc); pub(crate) struct ClientConfig { + pub(crate) connector: RefCell>, pub(crate) headers: HeaderMap, + pub(crate) timeout: Option, } impl Default for Client { fn default() -> Self { - Client { - connector: Rc::new(RefCell::new(ConnectorWrapper( + Client(Rc::new(ClientConfig { + connector: RefCell::new(Box::new(ConnectorWrapper( Connector::new().service(), ))), - config: Rc::new(ClientConfig { - headers: HeaderMap::new(), - }), - } + headers: HeaderMap::new(), + timeout: Some(Duration::from_secs(5)), + })) } } @@ -104,9 +103,9 @@ impl Client { where Uri: HttpTryFrom, { - let mut req = ClientRequest::new(method, url, self.connector.clone()); + let mut req = ClientRequest::new(method, url, self.0.clone()); - for (key, value) in &self.config.headers { + for (key, value) in &self.0.headers { req.head.headers.insert(key.clone(), value.clone()); } req @@ -180,9 +179,8 @@ impl Client { where Uri: HttpTryFrom, { - let mut req = WebsocketsRequest::new(url, self.connector.clone()); - - for (key, value) in &self.config.headers { + let mut req = WebsocketsRequest::new(url, self.0.clone()); + for (key, value) in &self.0.headers { req.head.headers.insert(key.clone(), value.clone()); } req diff --git a/awc/src/request.rs b/awc/src/request.rs index 2e778cfcf..170be75f7 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -1,4 +1,3 @@ -use std::cell::RefCell; use std::fmt; use std::io::Write; use std::rc::Rc; @@ -10,6 +9,7 @@ use futures::future::{err, Either}; use futures::{Future, Stream}; use serde::Serialize; use serde_json; +use tokio_timer::Timeout; use actix_http::body::{Body, BodyStream}; use actix_http::encoding::Decoder; @@ -20,9 +20,9 @@ use actix_http::http::{ }; use actix_http::{Error, Payload, RequestHead}; -use crate::connect::Connect; use crate::error::{InvalidUrl, PayloadError, SendRequestError}; use crate::response::ClientResponse; +use crate::ClientConfig; #[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))] const HTTPS_ENCODING: &str = "br, gzip, deflate"; @@ -62,16 +62,12 @@ pub struct ClientRequest { cookies: Option, default_headers: bool, response_decompress: bool, - connector: Rc>, + config: Rc, } impl ClientRequest { /// Create new client request builder. - pub(crate) fn new( - method: Method, - uri: U, - connector: Rc>, - ) -> Self + pub(crate) fn new(method: Method, uri: U, config: Rc) -> Self where Uri: HttpTryFrom, { @@ -87,7 +83,7 @@ impl ClientRequest { ClientRequest { head, err, - connector, + config, #[cfg(feature = "cookies")] cookies: None, default_headers: true, @@ -450,6 +446,7 @@ impl ClientRequest { let response_decompress = slf.response_decompress; let fut = slf + .config .connector .borrow_mut() .send_request(head, body.into()) @@ -462,7 +459,19 @@ impl ClientRequest { } }) }); - Either::B(fut) + + // set request timeout + if let Some(timeout) = slf.config.timeout { + Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| { + if let Some(e) = e.into_inner() { + e + } else { + SendRequestError::Timeout + } + }))) + } else { + Either::B(Either::B(fut)) + } } /// Set a JSON body and generate `ClientRequest` diff --git a/awc/src/ws.rs b/awc/src/ws.rs index ec7fc0da9..26594531d 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -1,5 +1,4 @@ //! Websockets client -use std::cell::RefCell; use std::io::Write; use std::rc::Rc; use std::{fmt, str}; @@ -10,9 +9,10 @@ use bytes::{BufMut, BytesMut}; #[cfg(feature = "cookies")] use cookie::{Cookie, CookieJar}; use futures::future::{err, Either, Future}; +use tokio_timer::Timeout; -use crate::connect::{BoxedSocket, Connect}; -use crate::error::{InvalidUrl, WsClientError}; +use crate::connect::BoxedSocket; +use crate::error::{InvalidUrl, SendRequestError, WsClientError}; use crate::http::header::{ self, HeaderName, HeaderValue, IntoHeaderValue, AUTHORIZATION, }; @@ -20,6 +20,7 @@ use crate::http::{ ConnectionType, Error as HttpError, HttpTryFrom, Method, StatusCode, Uri, Version, }; use crate::response::ClientResponse; +use crate::ClientConfig; /// `WebSocket` connection pub struct WebsocketsRequest { @@ -32,12 +33,12 @@ pub struct WebsocketsRequest { default_headers: bool, #[cfg(feature = "cookies")] cookies: Option, - connector: Rc>, + config: Rc, } impl WebsocketsRequest { /// Create new websocket connection - pub(crate) fn new(uri: U, connector: Rc>) -> Self + pub(crate) fn new(uri: U, config: Rc) -> Self where Uri: HttpTryFrom, { @@ -54,7 +55,7 @@ impl WebsocketsRequest { WebsocketsRequest { head, err, - connector, + config, origin: None, protocols: None, max_size: 65_536, @@ -322,6 +323,7 @@ impl WebsocketsRequest { let server_mode = slf.server_mode; let fut = slf + .config .connector .borrow_mut() .open_tunnel(head) @@ -393,6 +395,18 @@ impl WebsocketsRequest { }), )) }); - Either::B(fut) + + // set request timeout + if let Some(timeout) = slf.config.timeout { + Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| { + if let Some(e) = e.into_inner() { + e + } else { + SendRequestError::Timeout.into() + } + }))) + } else { + Either::B(Either::B(fut)) + } } } diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index 698b5ab7d..b2d6f8e90 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -1,14 +1,17 @@ use std::io::Write; +use std::time::Duration; use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::GzEncoder; use flate2::Compression; +use futures::future::Future; use rand::Rng; use actix_http::HttpService; use actix_http_test::TestServer; use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse}; +use awc::error::SendRequestError; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ @@ -57,6 +60,29 @@ fn test_simple() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } +#[test] +fn test_timeout() { + let mut srv = TestServer::new(|| { + HttpService::new(App::new().service(web::resource("/").route(web::to_async( + || { + tokio_timer::sleep(Duration::from_millis(200)) + .then(|_| Ok::<_, Error>(HttpResponse::Ok().body(STR))) + }, + )))) + }); + + let client = srv.execute(|| { + awc::Client::build() + .timeout(Duration::from_millis(50)) + .finish() + }); + let request = client.get(srv.url("/")).send(); + match srv.block_on(request) { + Err(SendRequestError::Timeout) => (), + _ => panic!(), + } +} + // #[test] // fn test_connection_close() { // let mut srv =