1
0
mirror of https://github.com/fafhrd91/actix-web synced 2024-11-27 17:52:56 +01:00

Fix leaks with actix_http's client (#1580)

This commit is contained in:
Patrick Tescher 2020-07-10 14:35:22 -07:00 committed by GitHub
parent a2662b928b
commit e10eb648d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 72 deletions

View File

@ -7,6 +7,7 @@
* Migrate cookie handling to `cookie` crate. * Migrate cookie handling to `cookie` crate.
* Update `sha-1` to 0.9 * Update `sha-1` to 0.9
* MSRV is now 1.41.1 * MSRV is now 1.41.1
* Fix client leak [#1580]
## [2.0.0-alpha.4] - 2020-05-21 ## [2.0.0-alpha.4] - 2020-05-21

View File

@ -2,7 +2,7 @@ use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::{Rc, Weak};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -53,16 +53,25 @@ where
+ 'static, + 'static,
{ {
pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self { pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self {
let connector_rc = Rc::new(RefCell::new(connector));
let inner_rc = Rc::new(RefCell::new(Inner {
config,
acquired: 0,
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: FxHashMap::default(),
waker: LocalWaker::new(),
}));
// start support future
actix_rt::spawn(ConnectorPoolSupport {
connector: connector_rc.clone(),
inner: Rc::downgrade(&inner_rc),
});
ConnectionPool( ConnectionPool(
Rc::new(RefCell::new(connector)), connector_rc,
Rc::new(RefCell::new(Inner { inner_rc,
config,
acquired: 0,
waiters: Slab::new(),
waiters_queue: IndexSet::new(),
available: FxHashMap::default(),
waker: LocalWaker::new(),
})),
) )
} }
} }
@ -92,12 +101,6 @@ where
} }
fn call(&mut self, req: Connect) -> Self::Future { fn call(&mut self, req: Connect) -> Self::Future {
// start support future
actix_rt::spawn(ConnectorPoolSupport {
connector: self.0.clone(),
inner: self.1.clone(),
});
let mut connector = self.0.clone(); let mut connector = self.0.clone();
let inner = self.1.clone(); let inner = self.1.clone();
@ -421,7 +424,7 @@ where
Io: AsyncRead + AsyncWrite + Unpin + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
connector: T, connector: T,
inner: Rc<RefCell<Inner<Io>>>, inner: Weak<RefCell<Inner<Io>>>,
} }
impl<T, Io> Future for ConnectorPoolSupport<T, Io> impl<T, Io> Future for ConnectorPoolSupport<T, Io>
@ -435,51 +438,55 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
let mut inner = this.inner.as_ref().borrow_mut(); if let Some(this_inner) = this.inner.upgrade() {
inner.waker.register(cx.waker()); let mut inner = this_inner.as_ref().borrow_mut();
inner.waker.register(cx.waker());
// check waiters // check waiters
loop { loop {
let (key, token) = { let (key, token) = {
if let Some((key, token)) = inner.waiters_queue.get_index(0) { if let Some((key, token)) = inner.waiters_queue.get_index(0) {
(key.clone(), *token) (key.clone(), *token)
} else { } else {
break; break;
}
};
if inner.waiters.get(token).unwrap().is_none() {
continue;
} }
};
if inner.waiters.get(token).unwrap().is_none() {
continue;
}
match inner.acquire(&key, cx) { match inner.acquire(&key, cx) {
Acquire::NotAvailable => break, Acquire::NotAvailable => break,
Acquire::Acquired(io, created) => { Acquire::Acquired(io, created) => {
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
if let Err(conn) = tx.send(Ok(IoConnection::new( if let Err(conn) = tx.send(Ok(IoConnection::new(
io, io,
created, created,
Some(Acquired(key.clone(), Some(this.inner.clone()))), Some(Acquired(key.clone(), Some(this_inner.clone()))),
))) { ))) {
let (io, created) = conn.unwrap().into_inner(); let (io, created) = conn.unwrap().into_inner();
inner.release_conn(&key, io, created); inner.release_conn(&key, io, created);
}
}
Acquire::Available => {
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this_inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
} }
} }
Acquire::Available => { let _ = inner.waiters_queue.swap_remove_index(0);
let (connect, tx) =
inner.waiters.get_mut(token).unwrap().take().unwrap();
OpenWaitingConnection::spawn(
key.clone(),
tx,
this.inner.clone(),
this.connector.call(connect),
inner.config.clone(),
);
}
} }
let _ = inner.waiters_queue.swap_remove_index(0);
}
Poll::Pending Poll::Pending
} else {
Poll::Ready(())
}
} }
} }

View File

@ -586,16 +586,16 @@ mod tests {
use super::*; use super::*;
use crate::Client; use crate::Client;
#[test] #[actix_rt::test]
fn test_debug() { async fn test_debug() {
let request = Client::new().get("/").header("x-test", "111"); let request = Client::new().get("/").header("x-test", "111");
let repr = format!("{:?}", request); let repr = format!("{:?}", request);
assert!(repr.contains("ClientRequest")); assert!(repr.contains("ClientRequest"));
assert!(repr.contains("x-test")); assert!(repr.contains("x-test"));
} }
#[test] #[actix_rt::test]
fn test_basics() { async fn test_basics() {
let mut req = Client::new() let mut req = Client::new()
.put("/") .put("/")
.version(Version::HTTP_2) .version(Version::HTTP_2)
@ -621,8 +621,8 @@ mod tests {
let _ = req.send_body(""); let _ = req.send_body("");
} }
#[test] #[actix_rt::test]
fn test_client_header() { async fn test_client_header() {
let req = Client::build() let req = Client::build()
.header(header::CONTENT_TYPE, "111") .header(header::CONTENT_TYPE, "111")
.finish() .finish()
@ -639,8 +639,8 @@ mod tests {
); );
} }
#[test] #[actix_rt::test]
fn test_client_header_override() { async fn test_client_header_override() {
let req = Client::build() let req = Client::build()
.header(header::CONTENT_TYPE, "111") .header(header::CONTENT_TYPE, "111")
.finish() .finish()
@ -658,8 +658,8 @@ mod tests {
); );
} }
#[test] #[actix_rt::test]
fn client_basic_auth() { async fn client_basic_auth() {
let req = Client::new() let req = Client::new()
.get("/") .get("/")
.basic_auth("username", Some("password")); .basic_auth("username", Some("password"));
@ -685,8 +685,8 @@ mod tests {
); );
} }
#[test] #[actix_rt::test]
fn client_bearer_auth() { async fn client_bearer_auth() {
let req = Client::new().get("/").bearer_auth("someS3cr3tAutht0k3n"); let req = Client::new().get("/").bearer_auth("someS3cr3tAutht0k3n");
assert_eq!( assert_eq!(
req.head req.head
@ -699,8 +699,8 @@ mod tests {
); );
} }
#[test] #[actix_rt::test]
fn client_query() { async fn client_query() {
let req = Client::new() let req = Client::new()
.get("/") .get("/")
.query(&[("key1", "val1"), ("key2", "val2")]) .query(&[("key1", "val1"), ("key2", "val2")])

View File

@ -27,15 +27,16 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
// benchmark sending all requests at the same time // benchmark sending all requests at the same time
fn bench_async_burst(c: &mut Criterion) { fn bench_async_burst(c: &mut Criterion) {
// We are using System here, since Runtime requires preinitialized tokio
// Maybe add to actix_rt docs
let mut rt = actix_rt::System::new("test");
let srv = test::start(|| { let srv = test::start(|| {
App::new() App::new()
.service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR)))) .service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))))
}); });
// We are using System here, since Runtime requires preinitialized tokio
// Maybe add to actix_rt docs
let url = srv.url("/"); let url = srv.url("/");
let mut rt = actix_rt::System::new("test");
c.bench_function("get_body_async_burst", move |b| { c.bench_function("get_body_async_burst", move |b| {
b.iter_custom(|iters| { b.iter_custom(|iters| {