diff --git a/Cargo.toml b/Cargo.toml index 74fe78b4..a886b5fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ actix-utils = "0.3.4" actix-router = "0.1.3" actix-rt = "0.2.2" actix-web-codegen = "0.1.0-beta.1" -actix-http = { version = "0.1.1", features=["fail"] } +actix-http = { version = "0.1.2", features=["fail"] } actix-server = "0.4.3" actix-server-config = "0.1.1" actix-threadpool = "0.1.0" @@ -98,7 +98,7 @@ openssl = { version="0.10", optional = true } rustls = { version = "^0.15", optional = true } [dev-dependencies] -actix-http = { version = "0.1.1", features=["ssl", "brotli", "flate2-zlib"] } +actix-http = { version = "0.1.2", features=["ssl", "brotli", "flate2-zlib"] } actix-http-test = { version = "0.1.0", features=["ssl"] } actix-files = { version = "0.1.0-beta.1" } rand = "0.6" diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2edcceeb..37d0eec6 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,5 +1,14 @@ # Changes +## [0.1.3] - 2019-04-23 + +### Fixed + +* Fix http client pool management + +* Fix http client wait queue management #794 + + ## [0.1.2] - 2019-04-23 ### Fixed diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 14df2eee..0241e847 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -114,7 +114,8 @@ where Request = TcpConnect, Response = TcpConnection, Error = actix_connect::ConnectError, - > + Clone, + > + Clone + + 'static, { /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. @@ -284,7 +285,9 @@ mod connect_impl { pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { pub(crate) tcp_pool: ConnectionPool, } @@ -293,7 +296,8 @@ mod connect_impl { where Io: AsyncRead + AsyncWrite + 'static, T: Service - + Clone, + + Clone + + 'static, { fn clone(&self) -> Self { InnerConnector { @@ -305,7 +309,9 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { type Request = Connect; type Response = IoConnection; @@ -356,9 +362,11 @@ mod connect_impl { Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, T1: Service - + Clone, + + Clone + + 'static, T2: Service - + Clone, + + Clone + + 'static, { fn clone(&self) -> Self { InnerConnector { @@ -372,8 +380,12 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service + + Clone + + 'static, + T2: Service + + Clone + + 'static, { type Request = Connect; type Response = EitherConnection; @@ -409,7 +421,9 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { fut: as Service>::Future, _t: PhantomData, @@ -417,7 +431,9 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service, + T: Service + + Clone + + 'static, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { @@ -435,7 +451,9 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { fut: as Service>::Future, _t: PhantomData, @@ -443,7 +461,9 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service, + T: Service + + Clone + + 'static, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index becc0752..97ed3bbc 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -274,7 +274,7 @@ impl Stream for PlStream { Ok(Async::Ready(Some(chunk))) } else { let framed = self.framed.take().unwrap(); - let force_close = framed.get_codec().keepalive(); + let force_close = !framed.get_codec().keepalive(); release_connection(framed, force_close); Ok(Async::Ready(None)) } diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 1164205e..8dedf72f 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -49,7 +49,9 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { pub(crate) fn new( connector: T, @@ -69,7 +71,7 @@ where waiters: Slab::new(), waiters_queue: IndexSet::new(), available: HashMap::new(), - task: AtomicTask::new(), + task: None, })), ) } @@ -88,7 +90,9 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service + + Clone + + 'static, { type Request = Connect; type Response = IoConnection; @@ -131,7 +135,17 @@ where } // connection is not available, wait - let (rx, token) = self.1.as_ref().borrow_mut().wait_for(req); + let (rx, token, support) = self.1.as_ref().borrow_mut().wait_for(req); + + // start support future + if !support { + self.1.as_ref().borrow_mut().task = Some(AtomicTask::new()); + tokio_current_thread::spawn(ConnectorPoolSupport { + connector: self.0.clone(), + inner: self.1.clone(), + }) + } + Either::B(Either::A(WaitForConnection { rx, key, @@ -245,7 +259,7 @@ where Ok(Async::Ready(IoConnection::new( ConnectionType::H2(snd), Instant::now(), - Some(Acquired(self.key.clone(), self.inner.clone())), + Some(Acquired(self.key.clone(), self.inner.take())), ))) } Ok(Async::NotReady) => Ok(Async::NotReady), @@ -256,12 +270,11 @@ where match self.fut.poll() { Err(err) => Err(err), Ok(Async::Ready((io, proto))) => { - let _ = self.inner.take(); if proto == Protocol::Http1 { Ok(Async::Ready(IoConnection::new( ConnectionType::H1(io), Instant::now(), - Some(Acquired(self.key.clone(), self.inner.clone())), + Some(Acquired(self.key.clone(), self.inner.take())), ))) } else { self.h2 = Some(handshake(io)); @@ -279,7 +292,6 @@ enum Acquire { NotAvailable, } -// #[derive(Debug)] struct AvailableConnection { io: ConnectionType, used: Instant, @@ -298,7 +310,7 @@ pub(crate) struct Inner { oneshot::Sender, ConnectError>>, )>, waiters_queue: IndexSet<(Key, usize)>, - task: AtomicTask, + task: Option, } impl Inner { @@ -314,18 +326,6 @@ impl Inner { self.waiters.remove(token); self.waiters_queue.remove(&(key.clone(), token)); } - - fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { - self.acquired -= 1; - self.available - .entry(key.clone()) - .or_insert_with(VecDeque::new) - .push_back(AvailableConnection { - io, - created, - used: Instant::now(), - }); - } } impl Inner @@ -339,6 +339,7 @@ where ) -> ( oneshot::Receiver, ConnectError>>, usize, + bool, ) { let (tx, rx) = oneshot::channel(); @@ -346,8 +347,9 @@ where let entry = self.waiters.vacant_entry(); let token = entry.key(); entry.insert((connect, tx)); - assert!(!self.waiters_queue.insert((key, token))); - (rx, token) + assert!(self.waiters_queue.insert((key, token))); + + (rx, token, self.task.is_some()) } fn acquire(&mut self, key: &Key) -> Acquire { @@ -400,6 +402,19 @@ where Acquire::Available } + fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { + self.acquired -= 1; + self.available + .entry(key.clone()) + .or_insert_with(VecDeque::new) + .push_back(AvailableConnection { + io, + created, + used: Instant::now(), + }); + self.check_availibility(); + } + fn release_close(&mut self, io: ConnectionType) { self.acquired -= 1; if let Some(timeout) = self.disconnect_timeout { @@ -407,11 +422,12 @@ where tokio_current_thread::spawn(CloseConnection::new(io, timeout)) } } + self.check_availibility(); } fn check_availibility(&self) { if !self.waiters_queue.is_empty() && self.acquired < self.limit { - self.task.notify() + self.task.as_ref().map(|t| t.notify()); } } } @@ -451,6 +467,147 @@ where } } +struct ConnectorPoolSupport +where + Io: AsyncRead + AsyncWrite + 'static, +{ + connector: T, + inner: Rc>>, +} + +impl Future for ConnectorPoolSupport +where + Io: AsyncRead + AsyncWrite + 'static, + T: Service, + T::Future: 'static, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + let mut inner = self.inner.as_ref().borrow_mut(); + inner.task.as_ref().unwrap().register(); + + // check waiters + loop { + let (key, token) = { + if let Some((key, token)) = inner.waiters_queue.get_index(0) { + (key.clone(), *token) + } else { + break; + } + }; + match inner.acquire(&key) { + Acquire::NotAvailable => break, + Acquire::Acquired(io, created) => { + let (_, tx) = inner.waiters.remove(token); + if let Err(conn) = tx.send(Ok(IoConnection::new( + io, + created, + Some(Acquired(key.clone(), Some(self.inner.clone()))), + ))) { + let (io, created) = conn.unwrap().into_inner(); + inner.release_conn(&key, io, created); + } + } + Acquire::Available => { + let (connect, tx) = inner.waiters.remove(token); + OpenWaitingConnection::spawn( + key.clone(), + tx, + self.inner.clone(), + self.connector.call(connect), + ); + } + } + let _ = inner.waiters_queue.swap_remove_index(0); + } + + Ok(Async::NotReady) + } +} + +struct OpenWaitingConnection +where + Io: AsyncRead + AsyncWrite + 'static, +{ + fut: F, + key: Key, + h2: Option>, + rx: Option, ConnectError>>>, + inner: Option>>>, +} + +impl OpenWaitingConnection +where + F: Future + 'static, + Io: AsyncRead + AsyncWrite + 'static, +{ + fn spawn( + key: Key, + rx: oneshot::Sender, ConnectError>>, + inner: Rc>>, + fut: F, + ) { + tokio_current_thread::spawn(OpenWaitingConnection { + key, + fut, + h2: None, + rx: Some(rx), + inner: Some(inner), + }) + } +} + +impl Drop for OpenWaitingConnection +where + Io: AsyncRead + AsyncWrite + 'static, +{ + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let mut inner = inner.as_ref().borrow_mut(); + inner.release(); + inner.check_availibility(); + } + } +} + +impl Future for OpenWaitingConnection +where + F: Future, + Io: AsyncRead + AsyncWrite, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Err(err) => { + let _ = self.inner.take(); + if let Some(rx) = self.rx.take() { + let _ = rx.send(Err(err)); + } + Err(()) + } + Ok(Async::Ready((io, proto))) => { + if proto == Protocol::Http1 { + let rx = self.rx.take().unwrap(); + let _ = rx.send(Ok(IoConnection::new( + ConnectionType::H1(io), + Instant::now(), + Some(Acquired(self.key.clone(), self.inner.take())), + ))); + Ok(Async::Ready(())) + } else { + self.h2 = Some(handshake(io)); + self.poll() + } + } + Ok(Async::NotReady) => Ok(Async::NotReady), + } + } +} + pub(crate) struct Acquired(Key, Option>>>); impl Acquired diff --git a/awc/Cargo.toml b/awc/Cargo.toml index a254d69c..e6018f44 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -38,7 +38,7 @@ flate2-rust = ["actix-http/flate2-rust"] [dependencies] actix-codec = "0.1.2" actix-service = "0.3.6" -actix-http = "0.1.1" +actix-http = "0.1.2" base64 = "0.10.1" bytes = "0.4" derive_more = "0.14" @@ -55,8 +55,8 @@ openssl = { version="0.10", optional = true } [dev-dependencies] actix-rt = "0.2.2" -actix-web = { version = "1.0.0-alpha.6", features=["ssl"] } -actix-http = { version = "0.1.1", features=["ssl"] } +actix-web = { version = "1.0.0-beta.1", features=["ssl"] } +actix-http = { version = "0.1.2", features=["ssl"] } actix-http-test = { version = "0.1.0", features=["ssl"] } actix-utils = "0.3.4" actix-server = { version = "0.4.3", features=["ssl"] } diff --git a/awc/tests/test_client.rs b/awc/tests/test_client.rs index afccdff8..d1139fdc 100644 --- a/awc/tests/test_client.rs +++ b/awc/tests/test_client.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use brotli2::write::BrotliEncoder; @@ -7,11 +9,12 @@ use bytes::Bytes; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use flate2::Compression; -use futures::future::Future; +use futures::Future; use rand::Rng; use actix_http::HttpService; use actix_http_test::TestServer; +use actix_service::{fn_service, NewService}; use actix_web::http::Cookie; use actix_web::middleware::{BodyEncoding, Compress}; use actix_web::{http::header, web, App, Error, HttpMessage, HttpRequest, HttpResponse}; @@ -144,17 +147,195 @@ fn test_timeout_override() { } #[test] -fn test_connection_close() { - let mut srv = TestServer::new(|| { - HttpService::new( - App::new().service(web::resource("/").to(|| HttpResponse::Ok())), - ) +fn test_connection_reuse() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + )) }); - let res = srv - .block_on(awc::Client::new().get(srv.url("/")).force_close().send()) - .unwrap(); - assert!(res.status().is_success()); + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // one connection + assert_eq!(num.load(Ordering::Relaxed), 1); +} + +#[test] +fn test_connection_force_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service(web::resource("/").route(web::to(|| HttpResponse::Ok()))), + )) + }); + + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).force_close().send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")).force_close(); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); +} + +#[test] +fn test_connection_server_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().finish())), + ), + )) + }); + + let client = awc::Client::default(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req = client.post(srv.url("/")); + let response = srv.block_on_fn(move || req.send()).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); +} + +#[test] +fn test_connection_wait_queue() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new(App::new().service( + web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR))), + ))) + }); + + let client = awc::Client::build() + .connector(awc::Connector::new().limit(1).finish()) + .finish(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req2 = client.post(srv.url("/")); + let req2_fut = srv.execute(move || { + let mut fut = req2.send(); + assert!(fut.poll().unwrap().is_not_ready()); + fut + }); + + // read response 1 + let bytes = srv.block_on(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + // req 2 + let response = srv.block_on(req2_fut).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 1); +} + +#[test] +fn test_connection_wait_queue_force_close() { + let num = Arc::new(AtomicUsize::new(0)); + let num2 = num.clone(); + + let mut srv = TestServer::new(move || { + let num2 = num2.clone(); + fn_service(move |io| { + num2.fetch_add(1, Ordering::Relaxed); + Ok(io) + }) + .and_then(HttpService::new( + App::new().service( + web::resource("/") + .route(web::to(|| HttpResponse::Ok().force_close().body(STR))), + ), + )) + }); + + let client = awc::Client::build() + .connector(awc::Connector::new().limit(1).finish()) + .finish(); + + // req 1 + let request = client.get(srv.url("/")).send(); + let mut response = srv.block_on(request).unwrap(); + assert!(response.status().is_success()); + + // req 2 + let req2 = client.post(srv.url("/")); + let req2_fut = srv.execute(move || { + let mut fut = req2.send(); + assert!(fut.poll().unwrap().is_not_ready()); + fut + }); + + // read response 1 + let bytes = srv.block_on(response.body()).unwrap(); + assert_eq!(bytes, Bytes::from_static(STR.as_ref())); + + // req 2 + let response = srv.block_on(req2_fut).unwrap(); + assert!(response.status().is_success()); + + // two connection + assert_eq!(num.load(Ordering::Relaxed), 2); } #[test] diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 37abe129..42d07549 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -124,6 +124,7 @@ impl TestServer { |e| log::error!("Can not set alpn protocol: {:?}", e), ); Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) .timeout(time::Duration::from_millis(500)) .ssl(builder.build()) .finish() @@ -131,6 +132,7 @@ impl TestServer { #[cfg(not(feature = "ssl"))] { Connector::new() + .conn_lifetime(time::Duration::from_secs(0)) .timeout(time::Duration::from_millis(500)) .finish() } @@ -163,6 +165,15 @@ impl TestServerRuntime { self.rt.block_on(fut) } + /// Execute future on current core + pub fn block_on_fn(&mut self, f: F) -> Result + where + F: FnOnce() -> R, + R: Future, + { + self.rt.block_on(lazy(|| f())) + } + /// Execute function on current core pub fn execute(&mut self, fut: F) -> R where