From 43ce25cda19b0803e880017f118f62ae2311813c Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 27 Dec 2020 05:27:59 +0800 Subject: [PATCH] Remove unused mods in actix-utils (#229) --- actix-utils/CHANGES.md | 5 +- actix-utils/Cargo.toml | 17 +- actix-utils/src/condition.rs | 129 -------------- actix-utils/src/counter.rs | 5 +- actix-utils/src/dispatcher.rs | 68 ++++---- actix-utils/src/either.rs | 153 ---------------- actix-utils/src/inflight.rs | 169 ------------------ actix-utils/src/keepalive.rs | 125 -------------- actix-utils/src/lib.rs | 8 - actix-utils/src/mpsc.rs | 13 +- actix-utils/src/oneshot.rs | 316 ---------------------------------- actix-utils/src/order.rs | 283 ------------------------------ actix-utils/src/stream.rs | 76 -------- actix-utils/src/task.rs | 11 +- actix-utils/src/time.rs | 225 ------------------------ actix-utils/src/timeout.rs | 72 +++++--- 16 files changed, 111 insertions(+), 1564 deletions(-) delete mode 100644 actix-utils/src/condition.rs delete mode 100644 actix-utils/src/either.rs delete mode 100644 actix-utils/src/inflight.rs delete mode 100644 actix-utils/src/keepalive.rs delete mode 100644 actix-utils/src/oneshot.rs delete mode 100644 actix-utils/src/order.rs delete mode 100644 actix-utils/src/stream.rs delete mode 100644 actix-utils/src/time.rs diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 22b7c7bf..b4d59ed0 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,7 +1,10 @@ # Changes ## Unreleased - 2020-xx-xx -* Upgrade `pin-project` to `1.0`. +* Use `pin-project-lite` to replace `pin-project`. [#229] +* Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229] + +[#229]: https://github.com/actix/actix-net/pull/229 ## 2.0.0 - 2020-08-23 * No changes from beta 1. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 7f47f359..f5bd5793 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "2.0.0" +version = "3.0.0" authors = ["Nikolay Kim "] description = "Various network related services and utilities for the Actix ecosystem." keywords = ["network", "framework", "async", "futures"] @@ -19,12 +19,11 @@ path = "src/lib.rs" actix-codec = "0.3.0" actix-rt = "1.1.1" actix-service = "1.0.6" -bitflags = "1.2.1" -bytes = "0.5.3" -either = "1.5.3" -futures-channel = { version = "0.3.4", default-features = false } -futures-sink = { version = "0.3.4", default-features = false } -futures-util = { version = "0.3.4", default-features = false } + +futures-core = { version = "0.3.7", default-features = false } +futures-sink = { version = "0.3.7", default-features = false } log = "0.4" -pin-project = "1.0.0" -slab = "0.4" +pin-project-lite = "0.2.0" + +[dev-dependencies] +futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-utils/src/condition.rs b/actix-utils/src/condition.rs deleted file mode 100644 index 9c7c977c..00000000 --- a/actix-utils/src/condition.rs +++ /dev/null @@ -1,129 +0,0 @@ -use std::cell::RefCell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use slab::Slab; - -use crate::task::LocalWaker; - -/// Condition allows to notify multiple receivers at the same time -pub struct Condition(Rc>); - -struct Inner { - data: Slab>, -} - -impl Default for Condition { - fn default() -> Self { - Self::new() - } -} - -impl Condition { - pub fn new() -> Condition { - Condition(Rc::new(RefCell::new(Inner { data: Slab::new() }))) - } - - /// Get condition waiter - pub fn wait(&mut self) -> Waiter { - let token = self.0.borrow_mut().data.insert(None); - Waiter { - token, - inner: self.0.clone(), - } - } - - /// Notify all waiters - pub fn notify(&self) { - let inner = self.0.borrow(); - for item in inner.data.iter() { - if let Some(waker) = item.1 { - waker.wake(); - } - } - } -} - -impl Drop for Condition { - fn drop(&mut self) { - self.notify() - } -} - -#[must_use = "Waiter do nothing unless polled"] -pub struct Waiter { - token: usize, - inner: Rc>, -} - -impl Clone for Waiter { - fn clone(&self) -> Self { - let token = self.inner.borrow_mut().data.insert(None); - Waiter { - token, - inner: self.inner.clone(), - } - } -} - -impl Future for Waiter { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - let mut inner = this.inner.borrow_mut(); - let inner = unsafe { inner.data.get_unchecked_mut(this.token) }; - if inner.is_none() { - let waker = LocalWaker::default(); - waker.register(cx.waker()); - *inner = Some(waker); - Poll::Pending - } else if inner.as_mut().unwrap().register(cx.waker()) { - Poll::Pending - } else { - Poll::Ready(()) - } - } -} - -impl Drop for Waiter { - fn drop(&mut self) { - self.inner.borrow_mut().data.remove(self.token); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - - #[actix_rt::test] - async fn test_condition() { - let mut cond = Condition::new(); - let mut waiter = cond.wait(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter).poll(cx)).await, - Poll::Pending - ); - cond.notify(); - waiter.await; - - let mut waiter = cond.wait(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter).poll(cx)).await, - Poll::Pending - ); - let mut waiter2 = waiter.clone(); - assert_eq!( - lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await, - Poll::Pending - ); - - drop(cond); - waiter.await; - waiter2.await; - } -} diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 4fe9dd0a..0b5984d2 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,6 +1,7 @@ -use std::cell::Cell; +use core::cell::Cell; +use core::task; + use std::rc::Rc; -use std::task; use crate::task::LocalWaker; diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs index 1ee72564..c3cb4f16 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-utils/src/dispatcher.rs @@ -2,13 +2,14 @@ #![allow(type_alias_bounds)] -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, mem}; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::{fmt, mem}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures_util::{future::Future, stream::Stream, FutureExt}; +use futures_core::stream::Stream; use log::debug; use crate::mpsc; @@ -61,25 +62,28 @@ pub enum Message { Close, } -/// Dispatcher is a future that reads frames from Framed object -/// and passes them to the service. -#[pin_project::pin_project] -pub struct Dispatcher -where - S: Service::Item, Response = I>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - I: 'static, - >::Error: std::fmt::Debug, -{ - service: S, - state: State, - #[pin] - framed: Framed, - rx: mpsc::Receiver, S::Error>>, - tx: mpsc::Sender, S::Error>>, +pin_project_lite::pin_project! { + /// Dispatcher is a future that reads frames from Framed object + /// and passes them to the service. + pub struct Dispatcher + where + S: Service::Item, Response = I>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead, + T: AsyncWrite, + U: Encoder, + U: Decoder, + I: 'static, + >::Error: fmt::Debug, + { + service: S, + state: State, + #[pin] + framed: Framed, + rx: mpsc::Receiver, S::Error>>, + tx: mpsc::Sender, S::Error>>, + } } enum State + Decoder, I> { @@ -114,8 +118,8 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - ::Error: std::fmt::Debug, - >::Error: std::fmt::Debug, + ::Error: fmt::Debug, + >::Error: fmt::Debug, { pub fn new>(framed: Framed, service: F) -> Self { let (tx, rx) = mpsc::channel(); @@ -178,7 +182,7 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, + >::Error: fmt::Debug, { loop { let this = self.as_mut().project(); @@ -198,9 +202,11 @@ where }; let tx = this.tx.clone(); - actix_rt::spawn(this.service.call(item).map(move |item| { + let fut = this.service.call(item); + actix_rt::spawn(async move { + let item = fut.await; let _ = tx.send(item.map(Message::Item)); - })); + }); } Poll::Pending => return false, Poll::Ready(Err(err)) => { @@ -220,7 +226,7 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, + >::Error: fmt::Debug, { loop { let mut this = self.as_mut().project(); @@ -271,8 +277,8 @@ where T: AsyncRead + AsyncWrite, U: Decoder + Encoder, I: 'static, - >::Error: std::fmt::Debug, - ::Error: std::fmt::Debug, + >::Error: fmt::Debug, + ::Error: fmt::Debug, { type Output = Result<(), DispatcherError>; diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs deleted file mode 100644 index fdf15ffe..00000000 --- a/actix-utils/src/either.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Contains `Either` service and related types and functions. -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{Service, ServiceFactory}; -use futures_util::{future, future::Future, ready}; - -/// Combine two different service types into a single type. -/// -/// Both services must be of the same request, response, and error types. -/// `EitherService` is useful for handling conditional branching in service -/// middleware to different inner service types. -pub struct EitherService { - left: A, - right: B, -} - -impl Clone for EitherService { - fn clone(&self) -> Self { - EitherService { - left: self.left.clone(), - right: self.right.clone(), - } - } -} - -impl Service for EitherService -where - A: Service, - B: Service, -{ - type Request = either::Either; - type Response = A::Response; - type Error = A::Error; - type Future = future::Either; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let left = self.left.poll_ready(cx)?; - let right = self.right.poll_ready(cx)?; - - if left.is_ready() && right.is_ready() { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn call(&mut self, req: either::Either) -> Self::Future { - match req { - either::Either::Left(req) => future::Either::Left(self.left.call(req)), - either::Either::Right(req) => future::Either::Right(self.right.call(req)), - } - } -} - -/// Combine two different new service types into a single service. -pub struct Either { - left: A, - right: B, -} - -impl Either { - pub fn new(left: A, right: B) -> Either - where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - Config = A::Config, - Response = A::Response, - Error = A::Error, - InitError = A::InitError, - >, - { - Either { left, right } - } -} - -impl ServiceFactory for Either -where - A: ServiceFactory, - A::Config: Clone, - B: ServiceFactory< - Config = A::Config, - Response = A::Response, - Error = A::Error, - InitError = A::InitError, - >, -{ - type Request = either::Either; - type Response = A::Response; - type Error = A::Error; - type InitError = A::InitError; - type Config = A::Config; - type Service = EitherService; - type Future = EitherNewService; - - fn new_service(&self, cfg: A::Config) -> Self::Future { - EitherNewService { - left: None, - right: None, - left_fut: self.left.new_service(cfg.clone()), - right_fut: self.right.new_service(cfg), - } - } -} - -impl Clone for Either { - fn clone(&self) -> Self { - Self { - left: self.left.clone(), - right: self.right.clone(), - } - } -} - -#[doc(hidden)] -#[pin_project::pin_project] -pub struct EitherNewService { - left: Option, - right: Option, - #[pin] - left_fut: A::Future, - #[pin] - right_fut: B::Future, -} - -impl Future for EitherNewService -where - A: ServiceFactory, - B: ServiceFactory, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.left.is_none() { - *this.left = Some(ready!(this.left_fut.poll(cx))?); - } - if this.right.is_none() { - *this.right = Some(ready!(this.right_fut.poll(cx))?); - } - - if this.left.is_some() && this.right.is_some() { - Poll::Ready(Ok(EitherService { - left: this.left.take().unwrap(), - right: this.right.take().unwrap(), - })) - } else { - Poll::Pending - } - } -} diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs deleted file mode 100644 index 8975a2d2..00000000 --- a/actix-utils/src/inflight.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::convert::Infallible; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; - -use super::counter::{Counter, CounterGuard}; - -/// InFlight - new service for service that can limit number of in-flight -/// async requests. -/// -/// Default number of in-flight requests is 15 -pub struct InFlight { - max_inflight: usize, -} - -impl InFlight { - pub fn new(max: usize) -> Self { - Self { max_inflight: max } - } -} - -impl Default for InFlight { - fn default() -> Self { - Self::new(15) - } -} - -impl Transform for InFlight -where - S: Service, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type InitError = Infallible; - type Transform = InFlightService; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ok(InFlightService::new(self.max_inflight, service)) - } -} - -pub struct InFlightService { - count: Counter, - service: S, -} - -impl InFlightService -where - S: Service, -{ - pub fn new(max: usize, service: U) -> Self - where - U: IntoService, - { - Self { - count: Counter::new(max), - service: service.into_service(), - } - } -} - -impl Service for InFlightService -where - T: Service, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Future = InFlightServiceResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.service.poll_ready(cx)?.is_pending() { - Poll::Pending - } else if !self.count.available(cx) { - log::trace!("InFlight limit exceeded"); - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: T::Request) -> Self::Future { - InFlightServiceResponse { - fut: self.service.call(req), - _guard: self.count.get(), - } - } -} - -#[doc(hidden)] -#[pin_project::pin_project] -pub struct InFlightServiceResponse { - #[pin] - fut: T::Future, - _guard: CounterGuard, -} - -impl Future for InFlightServiceResponse { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx) - } -} - -#[cfg(test)] -mod tests { - - use std::task::{Context, Poll}; - use std::time::Duration; - - use super::*; - use actix_service::{apply, fn_factory, Service, ServiceFactory}; - use futures_util::future::{lazy, ok, FutureExt, LocalBoxFuture}; - - struct SleepService(Duration); - - impl Service for SleepService { - type Request = (); - type Response = (); - type Error = (); - type Future = LocalBoxFuture<'static, Result<(), ()>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - actix_rt::time::delay_for(self.0) - .then(|_| ok::<_, ()>(())) - .boxed_local() - } - } - - #[actix_rt::test] - async fn test_transform() { - let wait_time = Duration::from_millis(50); - - let mut srv = InFlightService::new(1, SleepService(wait_time)); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - - let res = srv.call(()); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - - let _ = res.await; - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - } - - #[actix_rt::test] - async fn test_new_transform() { - let wait_time = Duration::from_millis(50); - - let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time)))); - - let mut srv = srv.new_service(&()).await.unwrap(); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - - let res = srv.call(()); - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - - let _ = res.await; - assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - } -} diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs deleted file mode 100644 index 4413dcd5..00000000 --- a/actix-utils/src/keepalive.rs +++ /dev/null @@ -1,125 +0,0 @@ -use std::convert::Infallible; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; - -use actix_rt::time::{delay_until, Delay, Instant}; -use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, Ready}; - -use super::time::{LowResTime, LowResTimeService}; - -pub struct KeepAlive { - f: F, - ka: Duration, - time: LowResTime, - _t: PhantomData<(R, E)>, -} - -impl KeepAlive -where - F: Fn() -> E + Clone, -{ - pub fn new(ka: Duration, time: LowResTime, f: F) -> Self { - KeepAlive { - f, - ka, - time, - _t: PhantomData, - } - } -} - -impl Clone for KeepAlive -where - F: Clone, -{ - fn clone(&self) -> Self { - KeepAlive { - f: self.f.clone(), - ka: self.ka, - time: self.time.clone(), - _t: PhantomData, - } - } -} - -impl ServiceFactory for KeepAlive -where - F: Fn() -> E + Clone, -{ - type Request = R; - type Response = R; - type Error = E; - type InitError = Infallible; - type Config = (); - type Service = KeepAliveService; - type Future = Ready>; - - fn new_service(&self, _: ()) -> Self::Future { - ok(KeepAliveService::new( - self.ka, - self.time.timer(), - self.f.clone(), - )) - } -} - -pub struct KeepAliveService { - f: F, - ka: Duration, - time: LowResTimeService, - delay: Delay, - expire: Instant, - _t: PhantomData<(R, E)>, -} - -impl KeepAliveService -where - F: Fn() -> E, -{ - pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self { - let expire = Instant::from_std(time.now() + ka); - KeepAliveService { - f, - ka, - time, - expire, - delay: delay_until(expire), - _t: PhantomData, - } - } -} - -impl Service for KeepAliveService -where - F: Fn() -> E, -{ - type Request = R; - type Response = R; - type Error = E; - type Future = Ready>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.delay).poll(cx) { - Poll::Ready(_) => { - let now = Instant::from_std(self.time.now()); - if self.expire <= now { - Poll::Ready(Err((self.f)())) - } else { - self.delay.reset(self.expire); - let _ = Pin::new(&mut self.delay).poll(cx); - Poll::Ready(Ok(())) - } - } - Poll::Pending => Poll::Ready(Ok(())), - } - } - - fn call(&mut self, req: R) -> Self::Future { - self.expire = Instant::from_std(self.time.now() + self.ka); - ok(req) - } -} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 19df225b..4c4f019c 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -5,16 +5,8 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -pub mod condition; pub mod counter; pub mod dispatcher; -pub mod either; -pub mod inflight; -pub mod keepalive; pub mod mpsc; -pub mod oneshot; -pub mod order; -pub mod stream; pub mod task; -pub mod time; pub mod timeout; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index 5905e123..2299dedb 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,15 +1,16 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use std::any::Any; -use std::cell::RefCell; +use core::any::Any; +use core::cell::RefCell; +use core::fmt; +use core::pin::Pin; +use core::task::{Context, Poll}; + use std::collections::VecDeque; use std::error::Error; -use std::fmt; -use std::pin::Pin; use std::rc::Rc; -use std::task::{Context, Poll}; +use futures_core::stream::Stream; use futures_sink::Sink; -use futures_util::stream::Stream; use crate::task::LocalWaker; diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs deleted file mode 100644 index e75fad60..00000000 --- a/actix-utils/src/oneshot.rs +++ /dev/null @@ -1,316 +0,0 @@ -//! A one-shot, futures-aware channel. -use std::cell::RefCell; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -pub use futures_channel::oneshot::Canceled; -use slab::Slab; - -use crate::task::LocalWaker; - -/// Creates a new futures-aware, one-shot channel. -pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(RefCell::new(Inner { - value: None, - rx_task: LocalWaker::new(), - })); - let tx = Sender { - inner: inner.clone(), - }; - let rx = Receiver { inner }; - (tx, rx) -} - -/// Creates a new futures-aware, pool of one-shot's. -pub fn pool() -> Pool { - Pool(Rc::new(RefCell::new(Slab::new()))) -} - -/// Represents the completion half of a oneshot through which the result of a -/// computation is signaled. -#[derive(Debug)] -pub struct Sender { - inner: Rc>>, -} - -/// A future representing the completion of a computation happening elsewhere in -/// memory. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct Receiver { - inner: Rc>>, -} - -// The channels do not ever project Pin to the inner T -impl Unpin for Receiver {} -impl Unpin for Sender {} - -#[derive(Debug)] -struct Inner { - value: Option, - rx_task: LocalWaker, -} - -impl Sender { - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the error provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was dropped before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, val: T) -> Result<(), T> { - if Rc::strong_count(&self.inner) == 2 { - let mut inner = self.inner.borrow_mut(); - inner.value = Some(val); - inner.rx_task.wake(); - Ok(()) - } else { - Err(val) - } - } - - /// Tests to see whether this `Sender`'s corresponding `Receiver` - /// has gone away. - pub fn is_canceled(&self) -> bool { - Rc::strong_count(&self.inner) == 1 - } -} - -impl Drop for Sender { - fn drop(&mut self) { - self.inner.borrow().rx_task.wake(); - } -} - -impl Future for Receiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // If we've got a value, then skip the logic below as we're done. - if let Some(val) = this.inner.borrow_mut().value.take() { - return Poll::Ready(Ok(val)); - } - - // Check if sender is dropped and return error if it is. - if Rc::strong_count(&this.inner) == 1 { - Poll::Ready(Err(Canceled)) - } else { - this.inner.borrow().rx_task.register(cx.waker()); - Poll::Pending - } - } -} - -/// Futures-aware, pool of one-shot's. -pub struct Pool(Rc>>>); - -bitflags::bitflags! { - pub struct Flags: u8 { - const SENDER = 0b0000_0001; - const RECEIVER = 0b0000_0010; - } -} - -#[derive(Debug)] -struct PoolInner { - flags: Flags, - value: Option, - waker: LocalWaker, -} - -impl Pool { - pub fn channel(&mut self) -> (PSender, PReceiver) { - let token = self.0.borrow_mut().insert(PoolInner { - flags: Flags::all(), - value: None, - waker: LocalWaker::default(), - }); - - ( - PSender { - token, - inner: self.0.clone(), - }, - PReceiver { - token, - inner: self.0.clone(), - }, - ) - } -} - -impl Clone for Pool { - fn clone(&self) -> Self { - Pool(self.0.clone()) - } -} - -/// Represents the completion half of a oneshot through which the result of a -/// computation is signaled. -#[derive(Debug)] -pub struct PSender { - token: usize, - inner: Rc>>>, -} - -/// A future representing the completion of a computation happening elsewhere in -/// memory. -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct PReceiver { - token: usize, - inner: Rc>>>, -} - -// The one-shots do not ever project Pin to the inner T -impl Unpin for PReceiver {} -impl Unpin for PSender {} - -impl PSender { - /// Completes this oneshot with a successful result. - /// - /// This function will consume `self` and indicate to the other end, the - /// `Receiver`, that the error provided is the result of the computation this - /// represents. - /// - /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was dropped before - /// this function was called, however, then `Err` is returned with the value - /// provided. - pub fn send(self, val: T) -> Result<(), T> { - let mut inner = self.inner.borrow_mut(); - let inner = unsafe { inner.get_unchecked_mut(self.token) }; - - if inner.flags.contains(Flags::RECEIVER) { - inner.value = Some(val); - inner.waker.wake(); - Ok(()) - } else { - Err(val) - } - } - - /// Tests to see whether this `Sender`'s corresponding `Receiver` - /// has gone away. - pub fn is_canceled(&self) -> bool { - !unsafe { self.inner.borrow().get_unchecked(self.token) } - .flags - .contains(Flags::RECEIVER) - } -} - -impl Drop for PSender { - fn drop(&mut self) { - let mut inner = self.inner.borrow_mut(); - let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; - if inner_token.flags.contains(Flags::RECEIVER) { - inner_token.waker.wake(); - inner_token.flags.remove(Flags::SENDER); - } else { - inner.remove(self.token); - } - } -} - -impl Drop for PReceiver { - fn drop(&mut self) { - let mut inner = self.inner.borrow_mut(); - let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; - if inner_token.flags.contains(Flags::SENDER) { - inner_token.flags.remove(Flags::RECEIVER); - } else { - inner.remove(self.token); - } - } -} - -impl Future for PReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - let mut inner = this.inner.borrow_mut(); - let inner = unsafe { inner.get_unchecked_mut(this.token) }; - - // If we've got a value, then skip the logic below as we're done. - if let Some(val) = inner.value.take() { - return Poll::Ready(Ok(val)); - } - - // Check if sender is dropped and return error if it is. - if !inner.flags.contains(Flags::SENDER) { - Poll::Ready(Err(Canceled)) - } else { - inner.waker.register(cx.waker()); - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - - #[actix_rt::test] - async fn test_oneshot() { - let (tx, rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, rx) = channel(); - assert!(!tx.is_canceled()); - drop(rx); - assert!(tx.is_canceled()); - assert!(tx.send("test").is_err()); - - let (tx, rx) = channel::<&'static str>(); - drop(tx); - assert!(rx.await.is_err()); - - let (tx, mut rx) = channel::<&'static str>(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, mut rx) = channel::<&'static str>(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - drop(tx); - assert!(rx.await.is_err()); - } - - #[actix_rt::test] - async fn test_pool() { - let (tx, rx) = pool().channel(); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, rx) = pool().channel(); - assert!(!tx.is_canceled()); - drop(rx); - assert!(tx.is_canceled()); - assert!(tx.send("test").is_err()); - - let (tx, rx) = pool::<&'static str>().channel(); - drop(tx); - assert!(rx.await.is_err()); - - let (tx, mut rx) = pool::<&'static str>().channel(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - tx.send("test").unwrap(); - assert_eq!(rx.await.unwrap(), "test"); - - let (tx, mut rx) = pool::<&'static str>().channel(); - assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); - drop(tx); - assert!(rx.await.is_err()); - } -} diff --git a/actix-utils/src/order.rs b/actix-utils/src/order.rs deleted file mode 100644 index 2d11b491..00000000 --- a/actix-utils/src/order.rs +++ /dev/null @@ -1,283 +0,0 @@ -use std::collections::VecDeque; -use std::convert::Infallible; -use std::fmt; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; - -use crate::oneshot; -use crate::task::LocalWaker; - -struct Record { - rx: oneshot::Receiver>, - tx: oneshot::Sender>, -} - -/// Timeout error -pub enum InOrderError { - /// Service error - Service(E), - /// Service call dropped - Disconnected, -} - -impl From for InOrderError { - fn from(err: E) -> Self { - InOrderError::Service(err) - } -} - -impl fmt::Debug for InOrderError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e), - InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"), - } - } -} - -impl fmt::Display for InOrderError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - InOrderError::Service(e) => e.fmt(f), - InOrderError::Disconnected => write!(f, "InOrder service disconnected"), - } - } -} - -/// InOrder - The service will yield responses as they become available, -/// in the order that their originating requests were submitted to the service. -pub struct InOrder { - _t: PhantomData, -} - -impl InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - pub fn new() -> Self { - Self { _t: PhantomData } - } - - pub fn service(service: S) -> InOrderService { - InOrderService::new(service) - } -} - -impl Default for InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - fn default() -> Self { - Self::new() - } -} - -impl Transform for InOrder -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - type Request = S::Request; - type Response = S::Response; - type Error = InOrderError; - type InitError = Infallible; - type Transform = InOrderService; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ok(InOrderService::new(service)) - } -} - -pub struct InOrderService { - service: S, - waker: Rc, - acks: VecDeque>, -} - -impl InOrderService -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - pub fn new(service: U) -> Self - where - U: IntoService, - { - Self { - service: service.into_service(), - acks: VecDeque::new(), - waker: Rc::new(LocalWaker::new()), - } - } -} - -impl Service for InOrderService -where - S: Service, - S::Response: 'static, - S::Future: 'static, - S::Error: 'static, -{ - type Request = S::Request; - type Response = S::Response; - type Error = InOrderError; - type Future = InOrderServiceResponse; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - // poll_ready could be called from different task - self.waker.register(cx.waker()); - - // check acks - while !self.acks.is_empty() { - let rec = self.acks.front_mut().unwrap(); - match Pin::new(&mut rec.rx).poll(cx) { - Poll::Ready(Ok(res)) => { - let rec = self.acks.pop_front().unwrap(); - let _ = rec.tx.send(res); - } - Poll::Pending => break, - Poll::Ready(Err(oneshot::Canceled)) => { - return Poll::Ready(Err(InOrderError::Disconnected)) - } - } - } - - // check nested service - if self - .service - .poll_ready(cx) - .map_err(InOrderError::Service)? - .is_pending() - { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, request: S::Request) -> Self::Future { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - self.acks.push_back(Record { rx: rx1, tx: tx2 }); - - let waker = self.waker.clone(); - let fut = self.service.call(request); - actix_rt::spawn(async move { - let res = fut.await; - waker.wake(); - let _ = tx1.send(res); - }); - - InOrderServiceResponse { rx: rx2 } - } -} - -#[doc(hidden)] -pub struct InOrderServiceResponse { - rx: oneshot::Receiver>, -} - -impl Future for InOrderServiceResponse { - type Output = Result>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.rx).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)), - Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())), - Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)), - } - } -} - -#[cfg(test)] -mod tests { - - use std::task::{Context, Poll}; - use std::time::Duration; - - use super::*; - use actix_service::Service; - use futures_channel::oneshot; - use futures_util::future::{lazy, poll_fn, FutureExt, LocalBoxFuture}; - - struct Srv; - - impl Service for Srv { - type Request = oneshot::Receiver; - type Response = usize; - type Error = (); - type Future = LocalBoxFuture<'static, Result>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: oneshot::Receiver) -> Self::Future { - req.map(|res| res.map_err(|_| ())).boxed_local() - } - } - - #[actix_rt::test] - async fn test_in_order() { - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - let (tx3, rx3) = oneshot::channel(); - let (tx_stop, rx_stop) = oneshot::channel(); - - let h = std::thread::spawn(move || { - let rx1 = rx1; - let rx2 = rx2; - let rx3 = rx3; - let tx_stop = tx_stop; - actix_rt::System::new("test").block_on(async { - let mut srv = InOrderService::new(Srv); - - let _ = lazy(|cx| srv.poll_ready(cx)).await; - let res1 = srv.call(rx1); - let res2 = srv.call(rx2); - let res3 = srv.call(rx3); - - actix_rt::spawn(async move { - poll_fn(|cx| { - let _ = srv.poll_ready(cx); - Poll::<()>::Pending - }) - .await; - }); - - assert_eq!(res1.await.unwrap(), 1); - assert_eq!(res2.await.unwrap(), 2); - assert_eq!(res3.await.unwrap(), 3); - - let _ = tx_stop.send(()); - actix_rt::System::current().stop(); - }); - }); - - let _ = tx3.send(3); - std::thread::sleep(Duration::from_millis(50)); - let _ = tx2.send(2); - let _ = tx1.send(1); - - let _ = rx_stop.await; - let _ = h.join(); - } -} diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs deleted file mode 100644 index 72e9e019..00000000 --- a/actix-utils/src/stream.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_service::{IntoService, Service}; -use futures_util::{stream::Stream, FutureExt}; - -use crate::mpsc; - -#[pin_project::pin_project] -pub struct Dispatcher -where - S: Stream, - T: Service + 'static, -{ - #[pin] - stream: S, - service: T, - err_rx: mpsc::Receiver, - err_tx: mpsc::Sender, -} - -impl Dispatcher -where - S: Stream, - T: Service + 'static, -{ - pub fn new(stream: S, service: F) -> Self - where - F: IntoService, - { - let (err_tx, err_rx) = mpsc::channel(); - Dispatcher { - err_rx, - err_tx, - stream, - service: service.into_service(), - } - } -} - -impl Future for Dispatcher -where - S: Stream, - T: Service + 'static, -{ - type Output = Result<(), T::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.as_mut().project(); - - if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) { - return Poll::Ready(Err(e)); - } - - loop { - return match this.service.poll_ready(cx)? { - Poll::Ready(_) => match this.stream.poll_next(cx) { - Poll::Ready(Some(item)) => { - let stop = this.err_tx.clone(); - actix_rt::spawn(this.service.call(item).map(move |res| { - if let Err(e) = res { - let _ = stop.send(e); - } - })); - this = self.as_mut().project(); - continue; - } - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(Ok(())), - }, - Poll::Pending => Poll::Pending, - }; - } - } -} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs index cb32eb8d..8f85f5e4 100644 --- a/actix-utils/src/task.rs +++ b/actix-utils/src/task.rs @@ -1,7 +1,7 @@ -use std::cell::UnsafeCell; -use std::marker::PhantomData; -use std::task::Waker; -use std::{fmt, rc}; +use core::cell::UnsafeCell; +use core::fmt; +use core::marker::PhantomData; +use core::task::Waker; /// A synchronization primitive for task wakeup. /// @@ -23,7 +23,8 @@ use std::{fmt, rc}; #[derive(Default)] pub struct LocalWaker { pub(crate) waker: UnsafeCell>, - _t: PhantomData>, + // mark LocalWaker as a !Send type. + _t: PhantomData<*const ()>, } impl LocalWaker { diff --git a/actix-utils/src/time.rs b/actix-utils/src/time.rs deleted file mode 100644 index 2ce65bc3..00000000 --- a/actix-utils/src/time.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::cell::RefCell; -use std::convert::Infallible; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::time::{self, Duration, Instant}; - -use actix_rt::time::delay_for; -use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, ready, FutureExt, Ready}; - -#[derive(Clone, Debug)] -pub struct LowResTime(Rc>); - -#[derive(Debug)] -struct Inner { - resolution: Duration, - current: Option, -} - -impl Inner { - fn new(resolution: Duration) -> Self { - Inner { - resolution, - current: None, - } - } -} - -impl LowResTime { - pub fn with(resolution: Duration) -> LowResTime { - LowResTime(Rc::new(RefCell::new(Inner::new(resolution)))) - } - - pub fn timer(&self) -> LowResTimeService { - LowResTimeService(self.0.clone()) - } -} - -impl Default for LowResTime { - fn default() -> Self { - LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1))))) - } -} - -impl ServiceFactory for LowResTime { - type Request = (); - type Response = Instant; - type Error = Infallible; - type InitError = Infallible; - type Config = (); - type Service = LowResTimeService; - type Future = Ready>; - - fn new_service(&self, _: ()) -> Self::Future { - ok(self.timer()) - } -} - -#[derive(Clone, Debug)] -pub struct LowResTimeService(Rc>); - -impl LowResTimeService { - pub fn with(resolution: Duration) -> LowResTimeService { - LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution)))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> Instant { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = Instant::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - actix_rt::spawn(delay_for(interval).then(move |_| { - inner.borrow_mut().current.take(); - ready(()) - })); - now - } - } -} - -impl Service for LowResTimeService { - type Request = (); - type Response = Instant; - type Error = Infallible; - type Future = Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - ok(self.now()) - } -} - -#[derive(Clone, Debug)] -pub struct SystemTime(Rc>); - -#[derive(Debug)] -struct SystemTimeInner { - resolution: Duration, - current: Option, -} - -impl SystemTimeInner { - fn new(resolution: Duration) -> Self { - SystemTimeInner { - resolution, - current: None, - } - } -} - -#[derive(Clone, Debug)] -pub struct SystemTimeService(Rc>); - -impl SystemTimeService { - pub fn with(resolution: Duration) -> SystemTimeService { - SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution)))) - } - - /// Get current time. This function has to be called from - /// future's poll method, otherwise it panics. - pub fn now(&self) -> time::SystemTime { - let cur = self.0.borrow().current; - if let Some(cur) = cur { - cur - } else { - let now = time::SystemTime::now(); - let inner = self.0.clone(); - let interval = { - let mut b = inner.borrow_mut(); - b.current = Some(now); - b.resolution - }; - - actix_rt::spawn(delay_for(interval).then(move |_| { - inner.borrow_mut().current.take(); - ready(()) - })); - now - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::{Duration, SystemTime}; - - /// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value. - #[actix_rt::test] - async fn system_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - - let time_service = SystemTimeService::with(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. - /// - /// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value. - #[actix_rt::test] - async fn low_res_time_service_time_does_not_immediately_change() { - let resolution = Duration::from_millis(50); - let time_service = LowResTimeService::with(resolution); - assert_eq!(time_service.now(), time_service.now()); - } - - /// State Under Test: `SystemTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[actix_rt::test] - async fn system_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(300); - - let time_service = SystemTimeService::with(resolution); - - let first_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - delay_for(wait_time).await; - - let second_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - - assert!(second_time - first_time >= wait_time); - } - - /// State Under Test: `LowResTimeService::now()` updates returned value every resolution period. - /// - /// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values - /// and second value is greater than the first one at least by a resolution interval. - #[actix_rt::test] - async fn low_res_time_service_time_updates_after_resolution_interval() { - let resolution = Duration::from_millis(100); - let wait_time = Duration::from_millis(300); - let time_service = LowResTimeService::with(resolution); - - let first_time = time_service.now(); - - delay_for(wait_time).await; - - let second_time = time_service.now(); - assert!(second_time - first_time >= wait_time); - } -} diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index f1d30f19..f3489b85 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -2,15 +2,14 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, time}; +use core::future::Future; +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::{fmt, time}; use actix_rt::time::{delay_for, Delay}; use actix_service::{IntoService, Service, Transform}; -use futures_util::future::{ok, Ready}; /// Applies a timeout to requests. #[derive(Debug)] @@ -85,15 +84,35 @@ where type Request = S::Request; type Response = S::Response; type Error = TimeoutError; - type InitError = E; type Transform = TimeoutService; - type Future = Ready>; + type InitError = E; + type Future = TimeoutFuture; fn new_transform(&self, service: S) -> Self::Future { - ok(TimeoutService { + let service = TimeoutService { service, timeout: self.timeout, - }) + }; + + TimeoutFuture { + service: Some(service), + _err: PhantomData, + } + } +} + +pub struct TimeoutFuture { + service: Option, + _err: PhantomData, +} + +impl Unpin for TimeoutFuture {} + +impl Future for TimeoutFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(self.get_mut().service.take().unwrap())) } } @@ -140,13 +159,14 @@ where } } -/// `TimeoutService` response future -#[pin_project::pin_project] -#[derive(Debug)] -pub struct TimeoutServiceResponse { - #[pin] - fut: T::Future, - sleep: Delay, +pin_project_lite::pin_project! { + /// `TimeoutService` response future + #[derive(Debug)] + pub struct TimeoutServiceResponse { + #[pin] + fut: T::Future, + sleep: Delay, + } } impl Future for TimeoutServiceResponse @@ -156,20 +176,20 @@ where type Output = Result>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.project(); // First, try polling the future - match this.fut.poll(cx) { - Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), - Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), - Poll::Pending => {} + if let Poll::Ready(res) = this.fut.poll(cx) { + return match res { + Ok(v) => Poll::Ready(Ok(v)), + Err(e) => Poll::Ready(Err(TimeoutError::Service(e))), + }; } // Now check the sleep - match Pin::new(&mut this.sleep).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), - } + Pin::new(this.sleep) + .poll(cx) + .map(|_| Err(TimeoutError::Timeout)) } }