1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-12-01 02:44:37 +01:00

added Pause/Resume for client connector

This commit is contained in:
Nikolay Kim 2018-04-06 11:08:41 -07:00
parent 084104d058
commit 8d5fa6ee71
3 changed files with 99 additions and 23 deletions

View File

@ -37,7 +37,7 @@ use server::IoStream;
/// with connection request. /// with connection request.
pub struct Connect { pub struct Connect {
pub(crate) uri: Uri, pub(crate) uri: Uri,
pub(crate) wait_time: Duration, pub(crate) wait_timeout: Duration,
pub(crate) conn_timeout: Duration, pub(crate) conn_timeout: Duration,
} }
@ -46,7 +46,7 @@ impl Connect {
pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> { pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> {
Ok(Connect { Ok(Connect {
uri: Uri::try_from(uri).map_err(|e| e.into())?, uri: Uri::try_from(uri).map_err(|e| e.into())?,
wait_time: Duration::from_secs(5), wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1), conn_timeout: Duration::from_secs(1),
}) })
} }
@ -60,9 +60,9 @@ impl Connect {
/// If connection pool limits are enabled, wait time indicates /// If connection pool limits are enabled, wait time indicates
/// max time to wait for available connection. /// max time to wait for available connection.
/// By default connect timeout is 5 secconds. /// By default wait timeout is 5 secconds.
pub fn wait_time(mut self, timeout: Duration) -> Self { pub fn wait_timeout(mut self, timeout: Duration) -> Self {
self.wait_time = timeout; self.wait_timeout = timeout;
self self
} }
} }
@ -71,6 +71,21 @@ impl Message for Connect {
type Result = Result<Connection, ClientConnectorError>; type Result = Result<Connection, ClientConnectorError>;
} }
/// Pause connection process for `ClientConnector`
///
/// All connect requests enter wait state during connector pause.
pub struct Pause {
time: Option<Duration>,
}
impl Message for Pause {
type Result = ();
}
/// Resume connection process for `ClientConnector`
#[derive(Message)]
pub struct Resume;
/// A set of errors that can occur during connecting to a http host /// A set of errors that can occur during connecting to a http host
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum ClientConnectorError { pub enum ClientConnectorError {
@ -145,6 +160,7 @@ pub struct ClientConnector {
to_close: Vec<Connection>, to_close: Vec<Connection>,
waiters: HashMap<Key, VecDeque<Waiter>>, waiters: HashMap<Key, VecDeque<Waiter>>,
wait_timeout: Option<(Instant, Timeout)>, wait_timeout: Option<(Instant, Timeout)>,
paused: Option<Option<(Instant, Timeout)>>,
} }
impl Actor for ClientConnector { impl Actor for ClientConnector {
@ -186,6 +202,7 @@ impl Default for ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None,
} }
} }
@ -202,6 +219,7 @@ impl Default for ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None,
} }
} }
} }
@ -267,6 +285,7 @@ impl ClientConnector {
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: HashMap::new(),
wait_timeout: None, wait_timeout: None,
paused: None,
} }
} }
@ -494,6 +513,47 @@ impl ClientConnector {
let _ = timeout.poll(); let _ = timeout.poll();
self.wait_timeout = Some((time, timeout)); self.wait_timeout = Some((time, timeout));
} }
fn wait_for(&mut self, key: Key,
wait: Duration, conn_timeout: Duration)
-> oneshot::Receiver<Result<Connection, ClientConnectorError>>
{
// connection is not available, wait
let (tx, rx) = oneshot::channel();
let wait = Instant::now() + wait;
self.install_wait_timeout(wait);
let waiter = Waiter{ tx, wait, conn_timeout };
self.waiters.entry(key.clone()).or_insert_with(VecDeque::new)
.push_back(waiter);
rx
}
}
impl Handler<Pause> for ClientConnector {
type Result = ();
fn handle(&mut self, msg: Pause, _: &mut Self::Context) {
if let Some(time) = msg.time {
let when = Instant::now() + time;
let mut timeout = Timeout::new(time, Arbiter::handle()).unwrap();
let _ = timeout.poll();
self.paused = Some(Some((when, timeout)));
} else {
if self.paused.is_none() {
self.paused = Some(None);
}
}
}
}
impl Handler<Resume> for ClientConnector {
type Result = ();
fn handle(&mut self, _: Resume, _: &mut Self::Context) {
self.paused.take();
}
} }
impl Handler<Connect> for ClientConnector { impl Handler<Connect> for ClientConnector {
@ -505,7 +565,7 @@ impl Handler<Connect> for ClientConnector {
} }
let uri = &msg.uri; let uri = &msg.uri;
let wait_time = msg.wait_time; let wait_timeout = msg.wait_timeout;
let conn_timeout = msg.conn_timeout; let conn_timeout = msg.conn_timeout;
// host name is required // host name is required
@ -536,6 +596,19 @@ impl Handler<Connect> for ClientConnector {
let port = uri.port().unwrap_or_else(|| proto.port()); let port = uri.port().unwrap_or_else(|| proto.port());
let key = Key {host, port, ssl: proto.is_secure()}; let key = Key {host, port, ssl: proto.is_secure()};
// check pause state
if self.paused.is_some() {
let rx = self.wait_for(key, wait_timeout, conn_timeout);
return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self)
.and_then(|res, _, _| match res {
Ok(conn) => fut::ok(conn),
Err(err) => fut::err(err),
}));
}
// acquire connection // acquire connection
let pool = if proto.is_http() { let pool = if proto.is_http() {
match self.acquire(&key) { match self.acquire(&key) {
@ -546,14 +619,7 @@ impl Handler<Connect> for ClientConnector {
}, },
Acquire::NotAvailable => { Acquire::NotAvailable => {
// connection is not available, wait // connection is not available, wait
let (tx, rx) = oneshot::channel(); let rx = self.wait_for(key, wait_timeout, conn_timeout);
let wait = Instant::now() + wait_time;
self.install_wait_timeout(wait);
let waiter = Waiter{ tx, wait, conn_timeout };
self.waiters.entry(key.clone()).or_insert_with(VecDeque::new)
.push_back(waiter);
return ActorResponse::async( return ActorResponse::async(
rx.map_err(|_| ClientConnectorError::Disconnected) rx.map_err(|_| ClientConnectorError::Disconnected)
.into_actor(self) .into_actor(self)
@ -645,6 +711,14 @@ impl fut::ActorFuture for Maintenance
fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>) fn poll(&mut self, act: &mut ClientConnector, ctx: &mut Context<ClientConnector>)
-> Poll<Self::Item, Self::Error> -> Poll<Self::Item, Self::Error>
{ {
// check pause duration
let done = if let Some(Some(ref pause)) = act.paused {
if pause.0 <= Instant::now() {true} else {false}
} else { false };
if done {
act.paused.take();
}
// collect connections // collect connections
if act.pool_modified.get() { if act.pool_modified.get() {
act.collect(false); act.collect(false);

View File

@ -9,7 +9,9 @@ mod writer;
pub use self::pipeline::{SendRequest, SendRequestError}; pub use self::pipeline::{SendRequest, SendRequestError};
pub use self::request::{ClientRequest, ClientRequestBuilder}; pub use self::request::{ClientRequest, ClientRequestBuilder};
pub use self::response::ClientResponse; pub use self::response::ClientResponse;
pub use self::connector::{Connect, Connection, ClientConnector, ClientConnectorError}; pub use self::connector::{
Connect, Pause, Resume,
Connection, ClientConnector, ClientConnectorError};
pub(crate) use self::writer::HttpClientWriter; pub(crate) use self::writer::HttpClientWriter;
pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError}; pub(crate) use self::parser::{HttpResponseParser, HttpResponseParserError};

View File

@ -62,14 +62,14 @@ enum State {
None, None,
} }
/// `SendRequest` is a `Future` which represents asynchronous request sending process. /// `SendRequest` is a `Future` which represents asynchronous sending process.
#[must_use = "SendRequest does nothing unless polled"] #[must_use = "SendRequest does nothing unless polled"]
pub struct SendRequest { pub struct SendRequest {
req: ClientRequest, req: ClientRequest,
state: State, state: State,
conn: Addr<Unsync, ClientConnector>, conn: Addr<Unsync, ClientConnector>,
conn_timeout: Duration, conn_timeout: Duration,
wait_time: Duration, wait_timeout: Duration,
timeout: Option<Timeout>, timeout: Option<Timeout>,
} }
@ -84,7 +84,7 @@ impl SendRequest {
SendRequest{req, conn, SendRequest{req, conn,
state: State::New, state: State::New,
timeout: None, timeout: None,
wait_time: Duration::from_secs(5), wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1), conn_timeout: Duration::from_secs(1),
} }
} }
@ -95,7 +95,7 @@ impl SendRequest {
state: State::Connection(conn), state: State::Connection(conn),
conn: ClientConnector::from_registry(), conn: ClientConnector::from_registry(),
timeout: None, timeout: None,
wait_time: Duration::from_secs(5), wait_timeout: Duration::from_secs(5),
conn_timeout: Duration::from_secs(1), conn_timeout: Duration::from_secs(1),
} }
} }
@ -119,12 +119,12 @@ impl SendRequest {
self self
} }
/// Set wait time /// Set wait timeout
/// ///
/// If connections pool limits are enabled, wait time indicates max time /// If connections pool limits are enabled, wait time indicates max time
/// to wait for available connection. Default value is 5 seconds. /// to wait for available connection. Default value is 5 seconds.
pub fn wait_time(mut self, timeout: Duration) -> Self { pub fn wait_timeout(mut self, timeout: Duration) -> Self {
self.wait_time = timeout; self.wait_timeout = timeout;
self self
} }
} }
@ -141,7 +141,7 @@ impl Future for SendRequest {
State::New => State::New =>
self.state = State::Connect(self.conn.send(Connect { self.state = State::Connect(self.conn.send(Connect {
uri: self.req.uri().clone(), uri: self.req.uri().clone(),
wait_time: self.wait_time, wait_timeout: self.wait_timeout,
conn_timeout: self.conn_timeout, conn_timeout: self.conn_timeout,
})), })),
State::Connect(mut conn) => match conn.poll() { State::Connect(mut conn) => match conn.poll() {