diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 54f7357f..6ba111eb 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -4,7 +4,12 @@ ### Changed * Feature `cookies` is now optional and disabled by default. [#1981] +### Removed +* re-export of `futures_channel::oneshot::Canceled` is removed from `error` mod. [#1994] +* `ResponseError` impl for `futures_channel::oneshot::Canceled` is removed. [#1994] + [#1981]: https://github.com/actix/actix-web/pull/1981 +[#1994]: https://github.com/actix/actix-web/pull/1994 ## 3.0.0-beta.3 - 2021-02-10 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 69a344d4..78fb5507 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -59,13 +59,11 @@ cfg-if = "1" cookie = { version = "0.14.1", features = ["percent-encode"], optional = true } derive_more = "0.99.5" encoding_rs = "0.8" -futures-channel = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc", "sink"] } h2 = "0.3.0" http = "0.2.2" httparse = "1.3" -indexmap = "1.3" itoa = "0.4" language-tags = "0.2" lazy_static = "1.4" @@ -79,9 +77,9 @@ serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.7" sha-1 = "0.9" -slab = "0.4" smallvec = "1.6" time = { version = "0.2.23", default-features = false, features = ["std"] } +tokio = { version = "1.2", features = ["sync"] } # compression brotli2 = { version="0.3.2", optional = true } diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 4c6a6dcb..0b2c6e1d 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -103,7 +103,10 @@ pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { #[doc(hidden)] /// HTTP client connection -pub struct IoConnection { +pub struct IoConnection +where + T: AsyncWrite + Unpin + 'static, +{ io: Option>, created: time::Instant, pool: Option>, @@ -111,7 +114,7 @@ pub struct IoConnection { impl fmt::Debug for IoConnection where - T: fmt::Debug, + T: AsyncWrite + Unpin + fmt::Debug + 'static, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.io { @@ -138,6 +141,11 @@ impl IoConnection { pub(crate) fn into_inner(self) -> (ConnectionType, time::Instant) { (self.io.unwrap(), self.created) } + + #[cfg(test)] + pub(crate) fn into_parts(self) -> (ConnectionType, time::Instant, Acquired) { + (self.io.unwrap(), self.created, self.pool.unwrap()) + } } impl Connection for IoConnection @@ -202,7 +210,11 @@ where } #[allow(dead_code)] -pub(crate) enum EitherConnection { +pub(crate) enum EitherConnection +where + A: AsyncRead + AsyncWrite + Unpin + 'static, + B: AsyncRead + AsyncWrite + Unpin + 'static, +{ A(IoConnection), B(IoConnection), } diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index 92c3c0e1..1ff5c701 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -165,7 +165,10 @@ where #[doc(hidden)] /// HTTP client connection -pub struct H1Connection { +pub struct H1Connection +where + T: AsyncWrite + Unpin + 'static, +{ /// T should be `Unpin` io: Option, created: time::Instant, diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 867ba5c0..b0b1613a 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -1,32 +1,27 @@ use std::cell::RefCell; use std::collections::VecDeque; use std::future::Future; +use std::ops::Deref; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; use actix_rt::time::{sleep, Sleep}; use actix_service::Service; -use actix_utils::task::LocalWaker; use ahash::AHashMap; -use bytes::Bytes; -use futures_channel::oneshot; use futures_core::future::LocalBoxFuture; -use futures_util::future::{poll_fn, FutureExt}; -use h2::client::{Connection, SendRequest}; use http::uri::Authority; -use indexmap::IndexSet; use pin_project::pin_project; -use slab::Slab; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use super::config::ConnectorConfig; -use super::connection::{ConnectionType, IoConnection}; +use super::connection::{ConnectionType, H2Connection, IoConnection}; use super::error::ConnectError; use super::h2proto::handshake; use super::Connect; -use crate::client::connection::H2Connection; #[derive(Clone, Copy, PartialEq)] /// Protocol version @@ -46,358 +41,279 @@ impl From for Key { } } -/// Connections pool -pub(crate) struct ConnectionPool(Rc, Rc>>); - -impl ConnectionPool +/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key +pub(crate) struct ConnectionPool where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, + Io: AsyncWrite + Unpin + 'static, { - pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self { - let connector_rc = Rc::new(connector); - let inner_rc = Rc::new(RefCell::new(Inner { - config, - acquired: 0, - waiters: Slab::new(), - waiters_queue: IndexSet::new(), - available: AHashMap::default(), - waker: LocalWaker::new(), - })); + connector: Rc, + inner: ConnectionPoolInner, +} - // start support future - actix_rt::spawn(ConnectorPoolSupport { - connector: Rc::clone(&connector_rc), - inner: Rc::clone(&inner_rc), - }); +/// wrapper type for check the ref count of Rc. +struct ConnectionPoolInner(Rc>) +where + Io: AsyncWrite + Unpin + 'static; - ConnectionPool(connector_rc, inner_rc) +impl ConnectionPoolInner +where + Io: AsyncWrite + Unpin + 'static, +{ + /// spawn a async for graceful shutdown h1 Io type with a timeout. + fn close(&self, conn: ConnectionType) { + if let Some(timeout) = self.config.disconnect_timeout { + if let ConnectionType::H1(io) = conn { + actix_rt::spawn(CloseConnection::new(io, timeout)); + } + } } } -impl Clone for ConnectionPool +impl Clone for ConnectionPoolInner where - Io: 'static, + Io: AsyncWrite + Unpin + 'static, { fn clone(&self) -> Self { - ConnectionPool(self.0.clone(), self.1.clone()) + Self(Rc::clone(&self.0)) } } -impl Drop for ConnectionPool { - fn drop(&mut self) { - // wake up the ConnectorPoolSupport when dropping so it can exit properly. - self.1.borrow().waker.wake(); - } -} - -impl Service for ConnectionPool +impl Deref for ConnectionPoolInner where + Io: AsyncWrite + Unpin + 'static, +{ + type Target = ConnectionPoolInnerPriv; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl Drop for ConnectionPoolInner +where + Io: AsyncWrite + Unpin + 'static, +{ + fn drop(&mut self) { + // When strong count is one it means the pool is dropped + // remove and drop all Io types. + if Rc::strong_count(&self.0) == 1 { + self.permits.close(); + std::mem::take(&mut *self.available.borrow_mut()) + .into_iter() + .for_each(|(_, conns)| { + conns.into_iter().for_each(|pooled| self.close(pooled.conn)) + }); + } + } +} + +struct ConnectionPoolInnerPriv +where + Io: AsyncWrite + Unpin + 'static, +{ + config: ConnectorConfig, + available: RefCell>>>, + permits: Arc, +} + +impl ConnectionPool +where + Io: AsyncWrite + Unpin + 'static, +{ + /// construct a new connection pool. + /// + /// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed + /// for on flight connections. + /// + /// The pool can only have equal to `limit` amount of requests spawning/using Io type + /// concurrently. + /// + /// Any requests beyond limit would be wait in fifo order and get notified in async + /// manner by [`tokio::sync::Semaphore`] + pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self { + let permits = Arc::new(Semaphore::new(config.limit)); + let available = RefCell::new(AHashMap::default()); + let connector = Rc::new(connector); + + let inner = ConnectionPoolInner(Rc::new(ConnectionPoolInnerPriv { + config, + available, + permits, + })); + + Self { connector, inner } + } +} + +impl Clone for ConnectionPool +where + Io: AsyncWrite + Unpin + 'static, +{ + fn clone(&self) -> Self { + Self { + connector: self.connector.clone(), + inner: self.inner.clone(), + } + } +} + +impl Service for ConnectionPool +where + S: Service + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, { type Response = IoConnection; type Error = ConnectError; type Future = LocalBoxFuture<'static, Result, ConnectError>>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx) + self.connector.poll_ready(cx) } fn call(&self, req: Connect) -> Self::Future { - let connector = self.0.clone(); - let inner = self.1.clone(); + let connector = self.connector.clone(); + let inner = self.inner.clone(); - let fut = async move { + Box::pin(async move { let key = if let Some(authority) = req.uri.authority() { authority.clone().into() } else { return Err(ConnectError::Unresolved); }; - // acquire connection - match poll_fn(|cx| Poll::Ready(inner.borrow_mut().acquire(&key, cx))).await { - Acquire::Acquired(io, created) => { - // use existing connection - Ok(IoConnection::new( - io, - created, - Some(Acquired(key, Some(inner))), - )) + // acquire an owned permit and carry it with connection + let permit = inner + .permits + .clone() + .acquire_owned() + .await + // TODO: use specific error for semaphore acquire error + .map_err(|_| ConnectError::NoRecords)?; + + // check if there is idle connection for given key. + let mut map = inner.available.borrow_mut(); + + let mut conn = None; + if let Some(conns) = map.get_mut(&key) { + let now = Instant::now(); + while let Some(mut c) = conns.pop_front() { + // check the lifetime and drop connection that live for too long. + if (now - c.used) > inner.config.conn_keep_alive + || (now - c.created) > inner.config.conn_lifetime + { + inner.close(c.conn); + // check if the connection is still usable. + } else { + if let ConnectionType::H1(ref mut io) = c.conn { + let check = ConnectionCheckFuture { io }; + match check.await { + ConnectionState::Break => { + inner.close(c.conn); + continue; + } + ConnectionState::Skip => continue, + ConnectionState::Live => conn = Some(c), + } + } else { + conn = Some(c); + } + + break; + } } - Acquire::Available => { - // open tcp connection + }; + + // drop map early to end the borrow_mut of RefCell. + drop(map); + + // construct acquired. It's used to put Io type back to pool/ close the Io type. + // permit is carried with the whole lifecycle of Acquired. + let acquired = Some(Acquired { key, inner, permit }); + + // match the connection and spawn new one if did not get anything. + match conn { + Some(conn) => Ok(IoConnection::new(conn.conn, conn.created, acquired)), + None => { let (io, proto) = connector.call(req).await?; - let config = inner.borrow().config.clone(); - - let guard = OpenGuard::new(key, inner); - if proto == Protocol::Http1 { Ok(IoConnection::new( ConnectionType::H1(io), Instant::now(), - Some(guard.consume()), + acquired, )) } else { - let (sender, connection) = handshake(io, &config).await?; + let config = &acquired.as_ref().unwrap().inner.config; + let (sender, connection) = handshake(io, config).await?; Ok(IoConnection::new( ConnectionType::H2(H2Connection::new(sender, connection)), Instant::now(), - Some(guard.consume()), + acquired, )) } } - _ => { - // connection is not available, wait - let (rx, token) = inner.borrow_mut().wait_for(req); - - let guard = WaiterGuard::new(key, token, inner); - let res = match rx.await { - Err(_) => Err(ConnectError::Disconnected), - Ok(res) => res, - }; - guard.consume(); - res - } } + }) + } +} + +/// Type for check the connection and determine if it's usable. +struct ConnectionCheckFuture<'a, Io> { + io: &'a mut Io, +} + +enum ConnectionState { + Live, + Break, + Skip, +} + +impl Future for ConnectionCheckFuture<'_, Io> +where + Io: AsyncRead + Unpin, +{ + type Output = ConnectionState; + + // this future is only used to get access to Context. + // It should never return Poll::Pending. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + let mut buf = [0; 2]; + let mut read_buf = ReadBuf::new(&mut buf); + + let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) { + // io is pending and new data would wake up it. + Poll::Pending => ConnectionState::Live, + // io have data inside. drop it. + Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { + ConnectionState::Break + } + // otherwise skip to next. + _ => ConnectionState::Skip, }; - fut.boxed_local() + Poll::Ready(state) } } -struct WaiterGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - key: Key, - token: usize, - inner: Option>>>, -} - -impl WaiterGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn new(key: Key, token: usize, inner: Rc>>) -> Self { - Self { - key, - token, - inner: Some(inner), - } - } - - fn consume(mut self) { - let _ = self.inner.take(); - } -} - -impl Drop for WaiterGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn drop(&mut self) { - if let Some(i) = self.inner.take() { - let mut inner = i.as_ref().borrow_mut(); - inner.release_waiter(&self.key, self.token); - inner.check_availability(); - } - } -} - -struct OpenGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - key: Key, - inner: Option>>>, -} - -impl OpenGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn new(key: Key, inner: Rc>>) -> Self { - Self { - key, - inner: Some(inner), - } - } - - fn consume(mut self) -> Acquired { - Acquired(self.key.clone(), self.inner.take()) - } -} - -impl Drop for OpenGuard -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn drop(&mut self) { - if let Some(i) = self.inner.take() { - let mut inner = i.as_ref().borrow_mut(); - inner.release(); - inner.check_availability(); - } - } -} - -enum Acquire { - Acquired(ConnectionType, Instant), - Available, - NotAvailable, -} - -struct AvailableConnection { - io: ConnectionType, +struct PooledConnection { + conn: ConnectionType, used: Instant, created: Instant, } -pub(crate) struct Inner { - config: ConnectorConfig, - acquired: usize, - available: AHashMap>>, - waiters: Slab< - Option<( - Connect, - oneshot::Sender, ConnectError>>, - )>, - >, - waiters_queue: IndexSet<(Key, usize)>, - waker: LocalWaker, -} - -impl Inner { - fn reserve(&mut self) { - self.acquired += 1; - } - - fn release(&mut self) { - self.acquired -= 1; - } - - fn release_waiter(&mut self, key: &Key, token: usize) { - self.waiters.remove(token); - let _ = self.waiters_queue.shift_remove(&(key.clone(), token)); - } -} - -impl Inner -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - /// connection is not available, wait - fn wait_for( - &mut self, - connect: Connect, - ) -> ( - oneshot::Receiver, ConnectError>>, - usize, - ) { - let (tx, rx) = oneshot::channel(); - - let key: Key = connect.uri.authority().unwrap().clone().into(); - let entry = self.waiters.vacant_entry(); - let token = entry.key(); - entry.insert(Some((connect, tx))); - assert!(self.waiters_queue.insert((key, token))); - - (rx, token) - } - - fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire { - // check limits - if self.config.limit > 0 && self.acquired >= self.config.limit { - return Acquire::NotAvailable; - } - - self.reserve(); - - // check if open connection is available - // cleanup stale connections at the same time - if let Some(ref mut connections) = self.available.get_mut(key) { - let now = Instant::now(); - while let Some(conn) = connections.pop_back() { - // check if it still usable - if (now - conn.used) > self.config.conn_keep_alive - || (now - conn.created) > self.config.conn_lifetime - { - if let Some(timeout) = self.config.disconnect_timeout { - if let ConnectionType::H1(io) = conn.io { - actix_rt::spawn(CloseConnection::new(io, timeout)); - } - } - } else { - let mut io = conn.io; - let mut buf = [0; 2]; - let mut read_buf = ReadBuf::new(&mut buf); - if let ConnectionType::H1(ref mut s) = io { - match Pin::new(s).poll_read(cx, &mut read_buf) { - Poll::Pending => {} - Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => { - if let Some(timeout) = self.config.disconnect_timeout { - if let ConnectionType::H1(io) = io { - actix_rt::spawn(CloseConnection::new( - io, timeout, - )); - } - } - continue; - } - _ => continue, - } - } - return Acquire::Acquired(io, conn.created); - } - } - } - Acquire::Available - } - - fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { - self.acquired -= 1; - self.available - .entry(key.clone()) - .or_insert_with(VecDeque::new) - .push_back(AvailableConnection { - io, - created, - used: Instant::now(), - }); - self.check_availability(); - } - - fn release_close(&mut self, io: ConnectionType) { - self.acquired -= 1; - if let Some(timeout) = self.config.disconnect_timeout { - if let ConnectionType::H1(io) = io { - actix_rt::spawn(CloseConnection::new(io, timeout)); - } - } - self.check_availability(); - } - - fn check_availability(&self) { - if !self.waiters_queue.is_empty() && self.acquired < self.config.limit { - self.waker.wake(); - } - } -} - -#[pin_project::pin_project] -struct CloseConnection { - io: T, +#[pin_project] +struct CloseConnection { + io: Io, #[pin] timeout: Sleep, } -impl CloseConnection +impl CloseConnection where - T: AsyncWrite + Unpin, + Io: AsyncWrite + Unpin, { - fn new(io: T, timeout: Duration) -> Self { + fn new(io: Io, timeout: Duration) -> Self { CloseConnection { io, timeout: sleep(timeout), @@ -405,9 +321,9 @@ where } } -impl Future for CloseConnection +impl Future for CloseConnection where - T: AsyncWrite + Unpin, + Io: AsyncWrite + Unpin, { type Output = (); @@ -416,230 +332,340 @@ where match this.timeout.poll(cx) { Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => match Pin::new(this.io).poll_shutdown(cx) { - Poll::Ready(_) => Poll::Ready(()), - Poll::Pending => Poll::Pending, - }, + Poll::Pending => Pin::new(this.io).poll_shutdown(cx).map(|_| ()), } } } -#[pin_project] -struct ConnectorPoolSupport +pub(crate) struct Acquired where - Io: AsyncRead + AsyncWrite + Unpin + 'static, + Io: AsyncWrite + Unpin + 'static, { - connector: Rc, - inner: Rc>>, -} - -impl Future for ConnectorPoolSupport -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service, - T::Future: 'static, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if Rc::strong_count(this.inner) == 1 { - // If we are last copy of Inner it means the ConnectionPool is already gone - // and we are safe to exit. - return Poll::Ready(()); - } - - let mut inner = this.inner.borrow_mut(); - inner.waker.register(cx.waker()); - - // check waiters - loop { - let (key, token) = { - if let Some((key, token)) = inner.waiters_queue.get_index(0) { - (key.clone(), *token) - } else { - break; - } - }; - if inner.waiters.get(token).unwrap().is_none() { - continue; - } - - match inner.acquire(&key, cx) { - Acquire::NotAvailable => break, - Acquire::Acquired(io, created) => { - let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1; - if let Err(conn) = tx.send(Ok(IoConnection::new( - io, - created, - Some(Acquired(key.clone(), Some(this.inner.clone()))), - ))) { - let (io, created) = conn.unwrap().into_inner(); - inner.release_conn(&key, io, created); - } - } - Acquire::Available => { - let (connect, tx) = - inner.waiters.get_mut(token).unwrap().take().unwrap(); - OpenWaitingConnection::spawn( - key.clone(), - tx, - this.inner.clone(), - this.connector.call(connect), - inner.config.clone(), - ); - } - } - let _ = inner.waiters_queue.swap_remove_index(0); - } - - Poll::Pending - } -} - -#[pin_project::pin_project(PinnedDrop)] -struct OpenWaitingConnection -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - #[pin] - fut: F, key: Key, - h2: Option< - LocalBoxFuture< - 'static, - Result<(SendRequest, Connection), h2::Error>, - >, - >, - rx: Option, ConnectError>>>, - inner: Option>>>, - config: ConnectorConfig, + inner: ConnectionPoolInner, + permit: OwnedSemaphorePermit, } -impl OpenWaitingConnection +impl Acquired where - F: Future> + 'static, Io: AsyncRead + AsyncWrite + Unpin + 'static, { - fn spawn( - key: Key, - rx: oneshot::Sender, ConnectError>>, - inner: Rc>>, - fut: F, - config: ConnectorConfig, - ) { - actix_rt::spawn(OpenWaitingConnection { - key, - fut, - h2: None, - rx: Some(rx), - inner: Some(inner), - config, + // close the Io type. + pub(crate) fn close(&mut self, conn: IoConnection) { + let (conn, _) = conn.into_inner(); + self.inner.close(conn); + } + + // put the Io type back to pool. + pub(crate) fn release(&mut self, conn: IoConnection) { + let (io, created) = conn.into_inner(); + let Acquired { key, inner, .. } = self; + inner + .available + .borrow_mut() + .entry(key.clone()) + .or_insert_with(VecDeque::new) + .push_back(PooledConnection { + conn: io, + created, + used: Instant::now(), + }); + + // a no op bind. used to stop clippy warning without adding allow attribute. + let _permit = &mut self.permit; + } +} + +#[cfg(test)] +mod test { + use super::*; + + use std::cell::Cell; + use std::io; + + use http::Uri; + + use crate::client::connection::IoConnection; + + // A stream type always return pending on async read. + // mock a usable tcp stream that ready to be used as client + struct TestStream(Rc>); + + impl Drop for TestStream { + fn drop(&mut self) { + self.0.set(self.0.get() - 1); + } + } + + impl AsyncRead for TestStream { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut ReadBuf<'_>, + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for TestStream { + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &[u8], + ) -> Poll> { + unimplemented!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + } + + struct TestPoolConnector { + generated: Rc>, + } + + impl Service for TestPoolConnector { + type Response = (TestStream, Protocol); + type Error = ConnectError; + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + unimplemented!("poll_ready is not used in test") + } + + fn call(&self, _: Connect) -> Self::Future { + self.generated.set(self.generated.get() + 1); + let generated = self.generated.clone(); + Box::pin(async { Ok((TestStream(generated), Protocol::Http1)) }) + } + } + + fn release(conn: IoConnection) + where + T: AsyncRead + AsyncWrite + Unpin + 'static, + { + let (conn, created, mut acquired) = conn.into_parts(); + acquired.release(IoConnection::new(conn, created, None)); + } + + #[actix_rt::test] + async fn test_pool_limit() { + let connector = TestPoolConnector { + generated: Rc::new(Cell::new(0)), + }; + + let config = ConnectorConfig { + limit: 1, + ..Default::default() + }; + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + + let waiting = Rc::new(Cell::new(true)); + + let waiting_clone = waiting.clone(); + actix_rt::spawn(async move { + actix_rt::time::sleep(Duration::from_millis(100)).await; + waiting_clone.set(false); + drop(conn); }); - } -} - -#[pin_project::pinned_drop] -impl PinnedDrop for OpenWaitingConnection -where - Io: AsyncRead + AsyncWrite + Unpin + 'static, -{ - fn drop(self: Pin<&mut Self>) { - if let Some(inner) = self.project().inner.take() { - let mut inner = inner.as_ref().borrow_mut(); - inner.release(); - inner.check_availability(); - } - } -} - -impl Future for OpenWaitingConnection -where - F: Future>, - Io: AsyncRead + AsyncWrite + Unpin, -{ - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(ref mut h2) = this.h2 { - return match Pin::new(h2).poll(cx) { - Poll::Ready(Ok((sender, connection))) => { - let rx = this.rx.take().unwrap(); - let _ = rx.send(Ok(IoConnection::new( - ConnectionType::H2(H2Connection::new(sender, connection)), - Instant::now(), - Some(Acquired(this.key.clone(), this.inner.take())), - ))); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - let _ = this.inner.take(); - if let Some(rx) = this.rx.take() { - let _ = rx.send(Err(ConnectError::H2(err))); - } - Poll::Ready(()) - } - }; - } - - match this.fut.poll(cx) { - Poll::Ready(Err(err)) => { - let _ = this.inner.take(); - if let Some(rx) = this.rx.take() { - let _ = rx.send(Err(err)); - } - Poll::Ready(()) - } - Poll::Ready(Ok((io, proto))) => { - if proto == Protocol::Http1 { - let rx = this.rx.take().unwrap(); - let _ = rx.send(Ok(IoConnection::new( - ConnectionType::H1(io), - Instant::now(), - Some(Acquired(this.key.clone(), this.inner.take())), - ))); - Poll::Ready(()) - } else { - *this.h2 = Some(handshake(io, this.config).boxed_local()); - self.poll(cx) - } - } - Poll::Pending => Poll::Pending, - } - } -} - -pub(crate) struct Acquired(Key, Option>>>); - -impl Acquired -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - pub(crate) fn close(&mut self, conn: IoConnection) { - if let Some(inner) = self.1.take() { - let (io, _) = conn.into_inner(); - inner.as_ref().borrow_mut().release_close(io); - } - } - pub(crate) fn release(&mut self, conn: IoConnection) { - if let Some(inner) = self.1.take() { - let (io, created) = conn.into_inner(); - inner - .as_ref() - .borrow_mut() - .release_conn(&self.0, io, created); - } - } -} - -impl Drop for Acquired { - fn drop(&mut self) { - if let Some(inner) = self.1.take() { - inner.as_ref().borrow_mut().release(); - } + + assert!(waiting.get()); + + let now = Instant::now(); + let conn = pool.call(req).await.unwrap(); + + release(conn); + assert!(!waiting.get()); + assert!(now.elapsed() >= Duration::from_millis(100)); + } + + #[actix_rt::test] + async fn test_pool_keep_alive() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let config = ConnectorConfig { + conn_keep_alive: Duration::from_secs(1), + ..Default::default() + }; + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + actix_rt::time::sleep(Duration::from_millis(1500)).await; + actix_rt::task::yield_now().await; + + let conn = pool.call(req).await.unwrap(); + // Note: spawned recycle connection is not ran yet. + // This is tokio current thread runtime specific behavior. + assert_eq!(2, generated_clone.get()); + + // yield task so the old connection is properly dropped. + actix_rt::task::yield_now().await; + assert_eq!(1, generated_clone.get()); + + release(conn); + } + + #[actix_rt::test] + async fn test_pool_lifetime() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let config = ConnectorConfig { + conn_lifetime: Duration::from_secs(1), + ..Default::default() + }; + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("http://localhost"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + actix_rt::time::sleep(Duration::from_millis(1500)).await; + actix_rt::task::yield_now().await; + + let conn = pool.call(req).await.unwrap(); + // Note: spawned recycle connection is not ran yet. + // This is tokio current thread runtime specific behavior. + assert_eq!(2, generated_clone.get()); + + // yield task so the old connection is properly dropped. + actix_rt::task::yield_now().await; + assert_eq!(1, generated_clone.get()); + + release(conn); + } + + #[actix_rt::test] + async fn test_pool_authority_key() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let config = ConnectorConfig::default(); + + let pool = super::ConnectionPool::new(connector, config); + + let req = Connect { + uri: Uri::from_static("https://crates.io"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + let conn = pool.call(req).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + let req = Connect { + uri: Uri::from_static("https://google.com"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(2, generated_clone.get()); + release(conn); + let conn = pool.call(req).await.unwrap(); + assert_eq!(2, generated_clone.get()); + release(conn); + } + + #[actix_rt::test] + async fn test_pool_drop() { + let generated = Rc::new(Cell::new(0)); + let generated_clone = generated.clone(); + + let connector = TestPoolConnector { generated }; + + let config = ConnectorConfig::default(); + + let pool = Rc::new(super::ConnectionPool::new(connector, config)); + + let req = Connect { + uri: Uri::from_static("https://crates.io"), + addr: None, + }; + + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(1, generated_clone.get()); + release(conn); + + let req = Connect { + uri: Uri::from_static("https://google.com"), + addr: None, + }; + let conn = pool.call(req.clone()).await.unwrap(); + assert_eq!(2, generated_clone.get()); + release(conn); + + let clone1 = pool.clone(); + let clone2 = clone1.clone(); + + drop(clone2); + for _ in 0..2 { + actix_rt::task::yield_now().await; + } + assert_eq!(2, generated_clone.get()); + + drop(clone1); + for _ in 0..2 { + actix_rt::task::yield_now().await; + } + assert_eq!(2, generated_clone.get()); + + drop(pool); + for _ in 0..2 { + actix_rt::task::yield_now().await; + } + assert_eq!(0, generated_clone.get()); } } diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 6a9de2d3..97f2b3ef 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -11,7 +11,6 @@ use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; use derive_more::{Display, From}; -pub use futures_channel::oneshot::Canceled; use http::uri::InvalidUri; use http::{header, Error as HttpError, StatusCode}; use serde::de::value::Error as DeError; @@ -186,9 +185,6 @@ impl ResponseError for DeError { } } -/// Returns [`StatusCode::INTERNAL_SERVER_ERROR`] for [`Canceled`]. -impl ResponseError for Canceled {} - /// Returns [`StatusCode::BAD_REQUEST`] for [`Utf8Error`]. impl ResponseError for Utf8Error { fn status_code(&self) -> StatusCode {