diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index b2b816d5..8d44ea6b 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,9 +1,13 @@ # Changes ## Unreleased - 2020-xx-xx +### Fixed +* Memory leak of `client::pool::ConnectorPoolSupport`. [#1626] + +[#1626]: https://github.com/actix/actix-web/pull/1626 -## 2.0.0-beta.2 - 2020-07-21 +## [2.0.0-beta.2] - 2020-07-21 ### Fixed * Potential UB in h1 decoder using uninitialized memory. [#1614] diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index f2c5b041..013a7967 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -2,7 +2,7 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; -use std::rc::{Rc, Weak}; +use std::rc::Rc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -65,8 +65,8 @@ where // start support future actix_rt::spawn(ConnectorPoolSupport { - connector: connector_rc.clone(), - inner: Rc::downgrade(&inner_rc), + connector: Rc::clone(&connector_rc), + inner: Rc::clone(&inner_rc), }); ConnectionPool(connector_rc, inner_rc) @@ -82,6 +82,13 @@ where } } +impl Drop for ConnectionPool { + fn drop(&mut self) { + // wake up the ConnectorPoolSupport when dropping so it can exit properly. + self.1.borrow().waker.wake(); + } +} + impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + Unpin + 'static, @@ -421,7 +428,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + 'static, { connector: T, - inner: Weak>>, + inner: Rc>>, } impl Future for ConnectorPoolSupport @@ -435,55 +442,57 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - if let Some(this_inner) = this.inner.upgrade() { - let mut inner = this_inner.as_ref().borrow_mut(); - inner.waker.register(cx.waker()); + if Rc::strong_count(this.inner) == 1 { + // If we are last copy of Inner it means the ConnectionPool is already gone + // and we are safe to exit. + return Poll::Ready(()); + } - // check waiters - loop { - let (key, token) = { - if let Some((key, token)) = inner.waiters_queue.get_index(0) { - (key.clone(), *token) - } else { - break; - } - }; - if inner.waiters.get(token).unwrap().is_none() { - continue; - } + let mut inner = this.inner.borrow_mut(); + inner.waker.register(cx.waker()); - match inner.acquire(&key, cx) { - Acquire::NotAvailable => break, - Acquire::Acquired(io, created) => { - let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; - if let Err(conn) = tx.send(Ok(IoConnection::new( - io, - created, - Some(Acquired(key.clone(), Some(this_inner.clone()))), - ))) { - let (io, created) = conn.unwrap().into_inner(); - inner.release_conn(&key, io, created); - } - } - Acquire::Available => { - let (connect, tx) = - inner.waiters.get_mut(token).unwrap().take().unwrap(); - OpenWaitingConnection::spawn( - key.clone(), - tx, - this_inner.clone(), - this.connector.call(connect), - inner.config.clone(), - ); - } + // check waiters + loop { + let (key, token) = { + if let Some((key, token)) = inner.waiters_queue.get_index(0) { + (key.clone(), *token) + } else { + break; } - let _ = inner.waiters_queue.swap_remove_index(0); + }; + if inner.waiters.get(token).unwrap().is_none() { + continue; } - Poll::Pending - } else { - Poll::Ready(()) + match inner.acquire(&key, cx) { + Acquire::NotAvailable => break, + Acquire::Acquired(io, created) => { + let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; + if let Err(conn) = tx.send(Ok(IoConnection::new( + io, + created, + Some(Acquired(key.clone(), Some(this.inner.clone()))), + ))) { + let (io, created) = conn.unwrap().into_inner(); + inner.release_conn(&key, io, created); + } + } + Acquire::Available => { + let (connect, tx) = + inner.waiters.get_mut(token).unwrap().take().unwrap(); + OpenWaitingConnection::spawn( + key.clone(), + tx, + this.inner.clone(), + this.connector.call(connect), + inner.config.clone(), + ); + } + } + let _ = inner.waiters_queue.swap_remove_index(0); } + + Poll::Pending } }