1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-24 16:02:59 +01:00

refactor client connector waiters maintenance

This commit is contained in:
Nikolay Kim 2018-06-23 12:40:21 +06:00
parent e3dc6f0ca8
commit cf38183dcb

View File

@ -207,7 +207,7 @@ pub struct ClientConnector {
acquired_per_host: HashMap<Key, usize>, acquired_per_host: HashMap<Key, usize>,
available: HashMap<Key, VecDeque<Conn>>, available: HashMap<Key, VecDeque<Conn>>,
to_close: Vec<Connection>, to_close: Vec<Connection>,
waiters: HashMap<Key, VecDeque<Waiter>>, waiters: Option<HashMap<Key, VecDeque<Waiter>>>,
wait_timeout: Option<(Instant, Delay)>, wait_timeout: Option<(Instant, Delay)>,
paused: Paused, paused: Paused,
} }
@ -255,7 +255,7 @@ impl Default for ClientConnector {
acquired_per_host: HashMap::new(), acquired_per_host: HashMap::new(),
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: Some(HashMap::new()),
wait_timeout: None, wait_timeout: None,
paused: Paused::No, paused: Paused::No,
} }
@ -278,7 +278,7 @@ impl Default for ClientConnector {
acquired_per_host: HashMap::new(), acquired_per_host: HashMap::new(),
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: Some(HashMap::new()),
wait_timeout: None, wait_timeout: None,
paused: Paused::No, paused: Paused::No,
} }
@ -344,7 +344,7 @@ impl ClientConnector {
acquired_per_host: HashMap::new(), acquired_per_host: HashMap::new(),
available: HashMap::new(), available: HashMap::new(),
to_close: Vec::new(), to_close: Vec::new(),
waiters: HashMap::new(), waiters: Some(HashMap::new()),
wait_timeout: None, wait_timeout: None,
paused: Paused::No, paused: Paused::No,
} }
@ -504,7 +504,7 @@ impl ClientConnector {
let now = Instant::now(); let now = Instant::now();
let mut next = None; let mut next = None;
for waiters in self.waiters.values_mut() { for waiters in self.waiters.as_mut().unwrap().values_mut() {
let mut idx = 0; let mut idx = 0;
while idx < waiters.len() { while idx < waiters.len() {
if waiters[idx].wait <= now { if waiters[idx].wait <= now {
@ -556,6 +556,8 @@ impl ClientConnector {
conn_timeout, conn_timeout,
}; };
self.waiters self.waiters
.as_mut()
.unwrap()
.entry(key) .entry(key)
.or_insert_with(VecDeque::new) .or_insert_with(VecDeque::new)
.push_back(waiter); .push_back(waiter);
@ -835,9 +837,9 @@ impl fut::ActorFuture for Maintenance {
act.collect_waiters(); act.collect_waiters();
// check waiters // check waiters
let tmp: &mut ClientConnector = unsafe { &mut *(act as *mut _) }; let mut waiters = act.waiters.take().unwrap();
for (key, waiters) in &mut tmp.waiters { for (key, waiters) in &mut waiters {
while let Some(waiter) = waiters.pop_front() { while let Some(waiter) = waiters.pop_front() {
if waiter.tx.is_canceled() { if waiter.tx.is_canceled() {
continue; continue;
@ -865,118 +867,118 @@ impl fut::ActorFuture for Maintenance {
), ),
).map_err(|_, _, _| ()) ).map_err(|_, _, _| ())
.and_then(move |res, act, _| { .and_then(move |res, act, _| {
#[cfg_attr(rustfmt, rustfmt_skip)] #[cfg(feature = "alpn")]
#[cfg(feature = "alpn")] match res {
match res { Err(err) => {
Err(err) => { act.stats.errors += 1;
act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into()));
let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(()))
fut::Either::B(fut::err(())) }
} Ok(stream) => {
Ok(stream) => { act.stats.opened += 1;
act.stats.opened += 1; if conn.0.ssl {
if conn.0.ssl { fut::Either::A(
fut::Either::A( act.connector
act.connector .connect_async(&key.host, stream)
.connect_async(&key.host, stream) .then(move |res| {
.then(move |res| { match res {
match res { Err(e) => {
Err(e) => { let _ = waiter.tx.send(
let _ = waiter.tx.send( Err(ClientConnectorError::SslError(e)));
Err(ClientConnectorError::SslError(e))); }
} Ok(stream) => {
Ok(stream) => { let _ = waiter.tx.send(
let _ = waiter.tx.send( Ok(Connection::new(
Ok(Connection::new( conn.0.clone(),
conn.0.clone(), Some(conn),
Some(conn), Box::new(stream),
Box::new(stream), )),
)), );
); }
} }
} Ok(())
Ok(()) })
}) .actfuture(),
.actfuture(), )
) } else {
} else { let _ = waiter.tx.send(Ok(Connection::new(
let _ = waiter.tx.send(Ok(Connection::new( conn.0.clone(),
conn.0.clone(), Some(conn),
Some(conn), Box::new(stream),
Box::new(stream), )));
))); fut::Either::B(fut::ok(()))
fut::Either::B(fut::ok(())) }
} }
} }
}
#[cfg_attr(rustfmt, rustfmt_skip)] #[cfg(all(feature = "tls", not(feature = "alpn")))]
#[cfg(all(feature = "tls", not(feature = "alpn")))] match res {
match res { Err(err) => {
Err(err) => { act.stats.errors += 1;
act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into()));
let _ = waiter.tx.send(Err(err.into())); fut::Either::B(fut::err(()))
fut::Either::B(fut::err(())) }
} Ok(stream) => {
Ok(stream) => { act.stats.opened += 1;
act.stats.opened += 1; if conn.0.ssl {
if conn.0.ssl { fut::Either::A(
fut::Either::A( act.connector
act.connector .connect_async(&conn.0.host, stream)
.connect_async(&conn.0.host, stream) .then(|res| {
.then(|res| { match res {
match res { Err(e) => {
Err(e) => { let _ = waiter.tx.send(Err(
let _ = waiter.tx.send(Err( ClientConnectorError::SslError(e),
ClientConnectorError::SslError(e), ));
)); }
} Ok(stream) => {
Ok(stream) => { let _ = waiter.tx.send(
let _ = waiter.tx.send( Ok(Connection::new(
Ok(Connection::new( conn.0.clone(),
conn.0.clone(), Some(conn), Some(conn),
Box::new(stream), Box::new(stream),
)), )),
); );
} }
} }
Ok(()) Ok(())
}) })
.into_actor(act), .into_actor(act),
) )
} else { } else {
let _ = waiter.tx.send(Ok(Connection::new( let _ = waiter.tx.send(Ok(Connection::new(
conn.0.clone(), conn.0.clone(),
Some(conn), Some(conn),
Box::new(stream), Box::new(stream),
))); )));
fut::Either::B(fut::ok(())) fut::Either::B(fut::ok(()))
} }
} }
} }
#[cfg_attr(rustfmt, rustfmt_skip)] #[cfg(not(any(feature = "alpn", feature = "tls")))]
#[cfg(not(any(feature = "alpn", feature = "tls")))] match res {
match res { Err(err) => {
Err(err) => { act.stats.errors += 1;
act.stats.errors += 1; let _ = waiter.tx.send(Err(err.into()));
let _ = waiter.tx.send(Err(err.into())); fut::err(())
fut::err(()) }
} Ok(stream) => {
Ok(stream) => { act.stats.opened += 1;
act.stats.opened += 1; if conn.0.ssl {
if conn.0.ssl { let _ = waiter.tx.send(Err(
let _ = waiter.tx.send(Err(ClientConnectorError::SslIsNotSupported)); ClientConnectorError::SslIsNotSupported,
} else { ));
let _ = waiter.tx.send(Ok(Connection::new( } else {
conn.0.clone(), let _ = waiter.tx.send(Ok(Connection::new(
Some(conn), conn.0.clone(),
Box::new(stream), Some(conn),
))); Box::new(stream),
}; )));
fut::ok(()) };
} fut::ok(())
} }
}
}) })
.spawn(ctx); .spawn(ctx);
} }
@ -984,6 +986,7 @@ impl fut::ActorFuture for Maintenance {
} }
} }
act.waiters = Some(waiters);
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }