1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 07:53:00 +01:00

add request timeout

This commit is contained in:
Nikolay Kim 2019-03-28 22:33:41 -07:00
parent ea4d98d669
commit 1e7096a63a
8 changed files with 119 additions and 50 deletions

View File

@ -105,6 +105,9 @@ pub enum SendRequestError {
/// Http2 error /// Http2 error
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
H2(h2::Error), H2(h2::Error),
/// Response took too long
#[display(fmt = "Timeout out while waiting for response")]
Timeout,
/// Tunnels are not supported for http2 connection /// Tunnels are not supported for http2 connection
#[display(fmt = "Tunnels are not supported for http2 connection")] #[display(fmt = "Tunnels are not supported for http2 connection")]
TunnelNotSupported, TunnelNotSupported,

View File

@ -4,11 +4,13 @@
### Added ### Added
* Re-export `actix_http::client::Connector` * Request timeout.
* Session wide headers * Re-export `actix_http::client::Connector`.
* Session wide basic and bearer auth * Session wide headers.
* Session wide basic and bearer auth.
## [0.1.0-alpha.1] - 2019-03-28 ## [0.1.0-alpha.1] - 2019-03-28

View File

@ -52,6 +52,7 @@ rand = "0.6"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_urlencoded = "0.5.3" serde_urlencoded = "0.5.3"
tokio-timer = "0.2.8"
cookie = { version="0.11", features=["percent-encode"], optional = true } cookie = { version="0.11", features=["percent-encode"], optional = true }
openssl = { version="0.10", optional = true } openssl = { version="0.10", optional = true }

View File

@ -1,12 +1,13 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration;
use actix_http::client::{ConnectError, Connection, Connector}; use actix_http::client::{ConnectError, Connection, Connector};
use actix_http::http::{header, HeaderMap, HeaderName, HttpTryFrom, Uri}; use actix_http::http::{header, HeaderMap, HeaderName, HttpTryFrom, Uri};
use actix_service::Service; use actix_service::Service;
use crate::connect::{Connect, ConnectorWrapper}; use crate::connect::ConnectorWrapper;
use crate::{Client, ClientConfig}; use crate::{Client, ClientConfig};
/// An HTTP Client builder /// An HTTP Client builder
@ -14,11 +15,10 @@ use crate::{Client, ClientConfig};
/// This type can be used to construct an instance of `Client` through a /// This type can be used to construct an instance of `Client` through a
/// builder-like pattern. /// builder-like pattern.
pub struct ClientBuilder { pub struct ClientBuilder {
connector: Rc<RefCell<dyn Connect>>, config: ClientConfig,
default_headers: bool, default_headers: bool,
allow_redirects: bool, allow_redirects: bool,
max_redirects: usize, max_redirects: usize,
headers: HeaderMap,
} }
impl ClientBuilder { impl ClientBuilder {
@ -27,10 +27,13 @@ impl ClientBuilder {
default_headers: true, default_headers: true,
allow_redirects: true, allow_redirects: true,
max_redirects: 10, max_redirects: 10,
headers: HeaderMap::new(), config: ClientConfig {
connector: Rc::new(RefCell::new(ConnectorWrapper( headers: HeaderMap::new(),
Connector::new().service(), timeout: Some(Duration::from_secs(5)),
))), connector: RefCell::new(Box::new(ConnectorWrapper(
Connector::new().service(),
))),
},
} }
} }
@ -42,7 +45,22 @@ impl ClientBuilder {
<T::Response as Connection>::Future: 'static, <T::Response as Connection>::Future: 'static,
T::Future: 'static, T::Future: 'static,
{ {
self.connector = Rc::new(RefCell::new(ConnectorWrapper(connector))); self.config.connector = RefCell::new(Box::new(ConnectorWrapper(connector)));
self
}
/// Set request timeout
///
/// Request timeout is the total time before a response must be received.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = Some(timeout);
self
}
/// Disable request timeout.
pub fn disable_timeout(mut self) -> Self {
self.config.timeout = None;
self self
} }
@ -81,7 +99,7 @@ impl ClientBuilder {
match HeaderName::try_from(key) { match HeaderName::try_from(key) {
Ok(key) => match value.try_into() { Ok(key) => match value.try_into() {
Ok(value) => { Ok(value) => {
self.headers.append(key, value); self.config.headers.append(key, value);
} }
Err(e) => log::error!("Header value error: {:?}", e), Err(e) => log::error!("Header value error: {:?}", e),
}, },
@ -115,12 +133,7 @@ impl ClientBuilder {
/// Finish build process and create `Client` instance. /// Finish build process and create `Client` instance.
pub fn finish(self) -> Client { pub fn finish(self) -> Client {
Client { Client(Rc::new(self.config))
connector: self.connector,
config: Rc::new(ClientConfig {
headers: self.headers,
}),
}
} }
} }
@ -135,6 +148,7 @@ mod tests {
let client = ClientBuilder::new().basic_auth("username", Some("password")); let client = ClientBuilder::new().basic_auth("username", Some("password"));
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()
@ -146,6 +160,7 @@ mod tests {
let client = ClientBuilder::new().basic_auth("username", None); let client = ClientBuilder::new().basic_auth("username", None);
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()
@ -162,6 +177,7 @@ mod tests {
let client = ClientBuilder::new().bearer_auth("someS3cr3tAutht0k3n"); let client = ClientBuilder::new().bearer_auth("someS3cr3tAutht0k3n");
assert_eq!( assert_eq!(
client client
.config
.headers .headers
.get(header::AUTHORIZATION) .get(header::AUTHORIZATION)
.unwrap() .unwrap()

View File

@ -22,6 +22,7 @@
//! ``` //! ```
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration;
pub use actix_http::{client::Connector, http}; pub use actix_http::{client::Connector, http};
@ -66,25 +67,23 @@ use self::connect::{Connect, ConnectorWrapper};
/// } /// }
/// ``` /// ```
#[derive(Clone)] #[derive(Clone)]
pub struct Client { pub struct Client(Rc<ClientConfig>);
pub(crate) connector: Rc<RefCell<dyn Connect>>,
pub(crate) config: Rc<ClientConfig>,
}
pub(crate) struct ClientConfig { pub(crate) struct ClientConfig {
pub(crate) connector: RefCell<Box<dyn Connect>>,
pub(crate) headers: HeaderMap, pub(crate) headers: HeaderMap,
pub(crate) timeout: Option<Duration>,
} }
impl Default for Client { impl Default for Client {
fn default() -> Self { fn default() -> Self {
Client { Client(Rc::new(ClientConfig {
connector: Rc::new(RefCell::new(ConnectorWrapper( connector: RefCell::new(Box::new(ConnectorWrapper(
Connector::new().service(), Connector::new().service(),
))), ))),
config: Rc::new(ClientConfig { headers: HeaderMap::new(),
headers: HeaderMap::new(), timeout: Some(Duration::from_secs(5)),
}), }))
}
} }
} }
@ -104,9 +103,9 @@ impl Client {
where where
Uri: HttpTryFrom<U>, Uri: HttpTryFrom<U>,
{ {
let mut req = ClientRequest::new(method, url, self.connector.clone()); let mut req = ClientRequest::new(method, url, self.0.clone());
for (key, value) in &self.config.headers { for (key, value) in &self.0.headers {
req.head.headers.insert(key.clone(), value.clone()); req.head.headers.insert(key.clone(), value.clone());
} }
req req
@ -180,9 +179,8 @@ impl Client {
where where
Uri: HttpTryFrom<U>, Uri: HttpTryFrom<U>,
{ {
let mut req = WebsocketsRequest::new(url, self.connector.clone()); let mut req = WebsocketsRequest::new(url, self.0.clone());
for (key, value) in &self.0.headers {
for (key, value) in &self.config.headers {
req.head.headers.insert(key.clone(), value.clone()); req.head.headers.insert(key.clone(), value.clone());
} }
req req

View File

@ -1,4 +1,3 @@
use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::io::Write; use std::io::Write;
use std::rc::Rc; use std::rc::Rc;
@ -10,6 +9,7 @@ use futures::future::{err, Either};
use futures::{Future, Stream}; use futures::{Future, Stream};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
use tokio_timer::Timeout;
use actix_http::body::{Body, BodyStream}; use actix_http::body::{Body, BodyStream};
use actix_http::encoding::Decoder; use actix_http::encoding::Decoder;
@ -20,9 +20,9 @@ use actix_http::http::{
}; };
use actix_http::{Error, Payload, RequestHead}; use actix_http::{Error, Payload, RequestHead};
use crate::connect::Connect;
use crate::error::{InvalidUrl, PayloadError, SendRequestError}; use crate::error::{InvalidUrl, PayloadError, SendRequestError};
use crate::response::ClientResponse; use crate::response::ClientResponse;
use crate::ClientConfig;
#[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))] #[cfg(any(feature = "brotli", feature = "flate2-zlib", feature = "flate2-rust"))]
const HTTPS_ENCODING: &str = "br, gzip, deflate"; const HTTPS_ENCODING: &str = "br, gzip, deflate";
@ -62,16 +62,12 @@ pub struct ClientRequest {
cookies: Option<CookieJar>, cookies: Option<CookieJar>,
default_headers: bool, default_headers: bool,
response_decompress: bool, response_decompress: bool,
connector: Rc<RefCell<dyn Connect>>, config: Rc<ClientConfig>,
} }
impl ClientRequest { impl ClientRequest {
/// Create new client request builder. /// Create new client request builder.
pub(crate) fn new<U>( pub(crate) fn new<U>(method: Method, uri: U, config: Rc<ClientConfig>) -> Self
method: Method,
uri: U,
connector: Rc<RefCell<dyn Connect>>,
) -> Self
where where
Uri: HttpTryFrom<U>, Uri: HttpTryFrom<U>,
{ {
@ -87,7 +83,7 @@ impl ClientRequest {
ClientRequest { ClientRequest {
head, head,
err, err,
connector, config,
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
cookies: None, cookies: None,
default_headers: true, default_headers: true,
@ -450,6 +446,7 @@ impl ClientRequest {
let response_decompress = slf.response_decompress; let response_decompress = slf.response_decompress;
let fut = slf let fut = slf
.config
.connector .connector
.borrow_mut() .borrow_mut()
.send_request(head, body.into()) .send_request(head, body.into())
@ -462,7 +459,19 @@ impl ClientRequest {
} }
}) })
}); });
Either::B(fut)
// set request timeout
if let Some(timeout) = slf.config.timeout {
Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout
}
})))
} else {
Either::B(Either::B(fut))
}
} }
/// Set a JSON body and generate `ClientRequest` /// Set a JSON body and generate `ClientRequest`

View File

@ -1,5 +1,4 @@
//! Websockets client //! Websockets client
use std::cell::RefCell;
use std::io::Write; use std::io::Write;
use std::rc::Rc; use std::rc::Rc;
use std::{fmt, str}; use std::{fmt, str};
@ -10,9 +9,10 @@ use bytes::{BufMut, BytesMut};
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
use cookie::{Cookie, CookieJar}; use cookie::{Cookie, CookieJar};
use futures::future::{err, Either, Future}; use futures::future::{err, Either, Future};
use tokio_timer::Timeout;
use crate::connect::{BoxedSocket, Connect}; use crate::connect::BoxedSocket;
use crate::error::{InvalidUrl, WsClientError}; use crate::error::{InvalidUrl, SendRequestError, WsClientError};
use crate::http::header::{ use crate::http::header::{
self, HeaderName, HeaderValue, IntoHeaderValue, AUTHORIZATION, self, HeaderName, HeaderValue, IntoHeaderValue, AUTHORIZATION,
}; };
@ -20,6 +20,7 @@ use crate::http::{
ConnectionType, Error as HttpError, HttpTryFrom, Method, StatusCode, Uri, Version, ConnectionType, Error as HttpError, HttpTryFrom, Method, StatusCode, Uri, Version,
}; };
use crate::response::ClientResponse; use crate::response::ClientResponse;
use crate::ClientConfig;
/// `WebSocket` connection /// `WebSocket` connection
pub struct WebsocketsRequest { pub struct WebsocketsRequest {
@ -32,12 +33,12 @@ pub struct WebsocketsRequest {
default_headers: bool, default_headers: bool,
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
cookies: Option<CookieJar>, cookies: Option<CookieJar>,
connector: Rc<RefCell<dyn Connect>>, config: Rc<ClientConfig>,
} }
impl WebsocketsRequest { impl WebsocketsRequest {
/// Create new websocket connection /// Create new websocket connection
pub(crate) fn new<U>(uri: U, connector: Rc<RefCell<dyn Connect>>) -> Self pub(crate) fn new<U>(uri: U, config: Rc<ClientConfig>) -> Self
where where
Uri: HttpTryFrom<U>, Uri: HttpTryFrom<U>,
{ {
@ -54,7 +55,7 @@ impl WebsocketsRequest {
WebsocketsRequest { WebsocketsRequest {
head, head,
err, err,
connector, config,
origin: None, origin: None,
protocols: None, protocols: None,
max_size: 65_536, max_size: 65_536,
@ -322,6 +323,7 @@ impl WebsocketsRequest {
let server_mode = slf.server_mode; let server_mode = slf.server_mode;
let fut = slf let fut = slf
.config
.connector .connector
.borrow_mut() .borrow_mut()
.open_tunnel(head) .open_tunnel(head)
@ -393,6 +395,18 @@ impl WebsocketsRequest {
}), }),
)) ))
}); });
Either::B(fut)
// set request timeout
if let Some(timeout) = slf.config.timeout {
Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| {
if let Some(e) = e.into_inner() {
e
} else {
SendRequestError::Timeout.into()
}
})))
} else {
Either::B(Either::B(fut))
}
} }
} }

View File

@ -1,14 +1,17 @@
use std::io::Write; use std::io::Write;
use std::time::Duration;
use brotli2::write::BrotliEncoder; use brotli2::write::BrotliEncoder;
use bytes::Bytes; use bytes::Bytes;
use flate2::write::GzEncoder; use flate2::write::GzEncoder;
use flate2::Compression; use flate2::Compression;
use futures::future::Future;
use rand::Rng; use rand::Rng;
use actix_http::HttpService; use actix_http::HttpService;
use actix_http_test::TestServer; use actix_http_test::TestServer;
use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse}; use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse};
use awc::error::SendRequestError;
const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \
@ -57,6 +60,29 @@ fn test_simple() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
#[test]
fn test_timeout() {
let mut srv = TestServer::new(|| {
HttpService::new(App::new().service(web::resource("/").route(web::to_async(
|| {
tokio_timer::sleep(Duration::from_millis(200))
.then(|_| Ok::<_, Error>(HttpResponse::Ok().body(STR)))
},
))))
});
let client = srv.execute(|| {
awc::Client::build()
.timeout(Duration::from_millis(50))
.finish()
});
let request = client.get(srv.url("/")).send();
match srv.block_on(request) {
Err(SendRequestError::Timeout) => (),
_ => panic!(),
}
}
// #[test] // #[test]
// fn test_connection_close() { // fn test_connection_close() {
// let mut srv = // let mut srv =