1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 20:01:48 +01:00

Remove unused mods in actix-utils (#229)

This commit is contained in:
fakeshadow 2020-12-27 05:27:59 +08:00 committed by GitHub
parent 4e4122b702
commit 43ce25cda1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 111 additions and 1564 deletions

View File

@ -1,7 +1,10 @@
# Changes
## Unreleased - 2020-xx-xx
* Upgrade `pin-project` to `1.0`.
* Use `pin-project-lite` to replace `pin-project`. [#229]
* Remove `condition`,`either`,`inflight`,`keepalive`,`oneshot`,`order`,`stream` and `time` mods. [#229]
[#229]: https://github.com/actix/actix-net/pull/229
## 2.0.0 - 2020-08-23
* No changes from beta 1.

View File

@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "2.0.0"
version = "3.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Various network related services and utilities for the Actix ecosystem."
keywords = ["network", "framework", "async", "futures"]
@ -19,12 +19,11 @@ path = "src/lib.rs"
actix-codec = "0.3.0"
actix-rt = "1.1.1"
actix-service = "1.0.6"
bitflags = "1.2.1"
bytes = "0.5.3"
either = "1.5.3"
futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false }
futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
pin-project = "1.0.0"
slab = "0.4"
pin-project-lite = "0.2.0"
[dev-dependencies]
futures-util = { version = "0.3.7", default-features = false }

View File

@ -1,129 +0,0 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use slab::Slab;
use crate::task::LocalWaker;
/// Condition allows to notify multiple receivers at the same time
pub struct Condition(Rc<RefCell<Inner>>);
struct Inner {
data: Slab<Option<LocalWaker>>,
}
impl Default for Condition {
fn default() -> Self {
Self::new()
}
}
impl Condition {
pub fn new() -> Condition {
Condition(Rc::new(RefCell::new(Inner { data: Slab::new() })))
}
/// Get condition waiter
pub fn wait(&mut self) -> Waiter {
let token = self.0.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.0.clone(),
}
}
/// Notify all waiters
pub fn notify(&self) {
let inner = self.0.borrow();
for item in inner.data.iter() {
if let Some(waker) = item.1 {
waker.wake();
}
}
}
}
impl Drop for Condition {
fn drop(&mut self) {
self.notify()
}
}
#[must_use = "Waiter do nothing unless polled"]
pub struct Waiter {
token: usize,
inner: Rc<RefCell<Inner>>,
}
impl Clone for Waiter {
fn clone(&self) -> Self {
let token = self.inner.borrow_mut().data.insert(None);
Waiter {
token,
inner: self.inner.clone(),
}
}
}
impl Future for Waiter {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.data.get_unchecked_mut(this.token) };
if inner.is_none() {
let waker = LocalWaker::default();
waker.register(cx.waker());
*inner = Some(waker);
Poll::Pending
} else if inner.as_mut().unwrap().register(cx.waker()) {
Poll::Pending
} else {
Poll::Ready(())
}
}
}
impl Drop for Waiter {
fn drop(&mut self) {
self.inner.borrow_mut().data.remove(self.token);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
#[actix_rt::test]
async fn test_condition() {
let mut cond = Condition::new();
let mut waiter = cond.wait();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
Poll::Pending
);
cond.notify();
waiter.await;
let mut waiter = cond.wait();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter).poll(cx)).await,
Poll::Pending
);
let mut waiter2 = waiter.clone();
assert_eq!(
lazy(|cx| Pin::new(&mut waiter2).poll(cx)).await,
Poll::Pending
);
drop(cond);
waiter.await;
waiter2.await;
}
}

View File

@ -1,6 +1,7 @@
use std::cell::Cell;
use core::cell::Cell;
use core::task;
use std::rc::Rc;
use std::task;
use crate::task::LocalWaker;

View File

@ -2,13 +2,14 @@
#![allow(type_alias_bounds)]
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_util::{future::Future, stream::Stream, FutureExt};
use futures_core::stream::Stream;
use log::debug;
use crate::mpsc;
@ -61,25 +62,28 @@ pub enum Message<T> {
Close,
}
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
#[pin_project::pin_project]
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder<I> + Decoder,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
service: S,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
pin_project_lite::pin_project! {
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead,
T: AsyncWrite,
U: Encoder<I>,
U: Decoder,
I: 'static,
<U as Encoder<I>>::Error: fmt::Debug,
{
service: S,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
}
enum State<S: Service, U: Encoder<I> + Decoder, I> {
@ -114,8 +118,8 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel();
@ -178,7 +182,7 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
{
loop {
let this = self.as_mut().project();
@ -198,9 +202,11 @@ where
};
let tx = this.tx.clone();
actix_rt::spawn(this.service.call(item).map(move |item| {
let fut = this.service.call(item);
actix_rt::spawn(async move {
let item = fut.await;
let _ = tx.send(item.map(Message::Item));
}));
});
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
@ -220,7 +226,7 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
@ -271,8 +277,8 @@ where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U, I>>;

View File

@ -1,153 +0,0 @@
//! Contains `Either` service and related types and functions.
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory};
use futures_util::{future, future::Future, ready};
/// Combine two different service types into a single type.
///
/// Both services must be of the same request, response, and error types.
/// `EitherService` is useful for handling conditional branching in service
/// middleware to different inner service types.
pub struct EitherService<A, B> {
left: A,
right: B,
}
impl<A: Clone, B: Clone> Clone for EitherService<A, B> {
fn clone(&self) -> Self {
EitherService {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
impl<A, B> Service for EitherService<A, B>
where
A: Service,
B: Service<Response = A::Response, Error = A::Error>,
{
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let left = self.left.poll_ready(cx)?;
let right = self.right.poll_ready(cx)?;
if left.is_ready() && right.is_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: either::Either<A::Request, B::Request>) -> Self::Future {
match req {
either::Either::Left(req) => future::Either::Left(self.left.call(req)),
either::Either::Right(req) => future::Either::Right(self.right.call(req)),
}
}
}
/// Combine two different new service types into a single service.
pub struct Either<A, B> {
left: A,
right: B,
}
impl<A, B> Either<A, B> {
pub fn new(left: A, right: B) -> Either<A, B>
where
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
Either { left, right }
}
}
impl<A, B> ServiceFactory for Either<A, B>
where
A: ServiceFactory,
A::Config: Clone,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
{
type Request = either::Either<A::Request, B::Request>;
type Response = A::Response;
type Error = A::Error;
type InitError = A::InitError;
type Config = A::Config;
type Service = EitherService<A::Service, B::Service>;
type Future = EitherNewService<A, B>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
EitherNewService {
left: None,
right: None,
left_fut: self.left.new_service(cfg.clone()),
right_fut: self.right.new_service(cfg),
}
}
}
impl<A: Clone, B: Clone> Clone for Either<A, B> {
fn clone(&self) -> Self {
Self {
left: self.left.clone(),
right: self.right.clone(),
}
}
}
#[doc(hidden)]
#[pin_project::pin_project]
pub struct EitherNewService<A: ServiceFactory, B: ServiceFactory> {
left: Option<A::Service>,
right: Option<B::Service>,
#[pin]
left_fut: A::Future,
#[pin]
right_fut: B::Future,
}
impl<A, B> Future for EitherNewService<A, B>
where
A: ServiceFactory,
B: ServiceFactory<Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Output = Result<EitherService<A::Service, B::Service>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.left.is_none() {
*this.left = Some(ready!(this.left_fut.poll(cx))?);
}
if this.right.is_none() {
*this.right = Some(ready!(this.right_fut.poll(cx))?);
}
if this.left.is_some() && this.right.is_some() {
Poll::Ready(Ok(EitherService {
left: this.left.take().unwrap(),
right: this.right.take().unwrap(),
}))
} else {
Poll::Pending
}
}
}

View File

@ -1,169 +0,0 @@
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
use super::counter::{Counter, CounterGuard};
/// InFlight - new service for service that can limit number of in-flight
/// async requests.
///
/// Default number of in-flight requests is 15
pub struct InFlight {
max_inflight: usize,
}
impl InFlight {
pub fn new(max: usize) -> Self {
Self { max_inflight: max }
}
}
impl Default for InFlight {
fn default() -> Self {
Self::new(15)
}
}
impl<S> Transform<S> for InFlight
where
S: Service,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type InitError = Infallible;
type Transform = InFlightService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InFlightService::new(self.max_inflight, service))
}
}
pub struct InFlightService<S> {
count: Counter,
service: S,
}
impl<S> InFlightService<S>
where
S: Service,
{
pub fn new<U>(max: usize, service: U) -> Self
where
U: IntoService<S>,
{
Self {
count: Counter::new(max),
service: service.into_service(),
}
}
}
impl<T> Service for InFlightService<T>
where
T: Service,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.service.poll_ready(cx)?.is_pending() {
Poll::Pending
} else if !self.count.available(cx) {
log::trace!("InFlight limit exceeded");
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: T::Request) -> Self::Future {
InFlightServiceResponse {
fut: self.service.call(req),
_guard: self.count.get(),
}
}
}
#[doc(hidden)]
#[pin_project::pin_project]
pub struct InFlightServiceResponse<T: Service> {
#[pin]
fut: T::Future,
_guard: CounterGuard,
}
impl<T: Service> Future for InFlightServiceResponse<T> {
type Output = Result<T::Response, T::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::{apply, fn_factory, Service, ServiceFactory};
use futures_util::future::{lazy, ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
impl Service for SleepService {
type Request = ();
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
actix_rt::time::delay_for(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}
}
#[actix_rt::test]
async fn test_transform() {
let wait_time = Duration::from_millis(50);
let mut srv = InFlightService::new(1, SleepService(wait_time));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
#[actix_rt::test]
async fn test_new_transform() {
let wait_time = Duration::from_millis(50);
let srv = apply(InFlight::new(1), fn_factory(|| ok(SleepService(wait_time))));
let mut srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
}
}

View File

@ -1,125 +0,0 @@
use std::convert::Infallible;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use actix_rt::time::{delay_until, Delay, Instant};
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, Ready};
use super::time::{LowResTime, LowResTimeService};
pub struct KeepAlive<R, E, F> {
f: F,
ka: Duration,
time: LowResTime,
_t: PhantomData<(R, E)>,
}
impl<R, E, F> KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
pub fn new(ka: Duration, time: LowResTime, f: F) -> Self {
KeepAlive {
f,
ka,
time,
_t: PhantomData,
}
}
}
impl<R, E, F> Clone for KeepAlive<R, E, F>
where
F: Clone,
{
fn clone(&self) -> Self {
KeepAlive {
f: self.f.clone(),
ka: self.ka,
time: self.time.clone(),
_t: PhantomData,
}
}
}
impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
type Request = R;
type Response = R;
type Error = E;
type InitError = Infallible;
type Config = ();
type Service = KeepAliveService<R, E, F>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(KeepAliveService::new(
self.ka,
self.time.timer(),
self.f.clone(),
))
}
}
pub struct KeepAliveService<R, E, F> {
f: F,
ka: Duration,
time: LowResTimeService,
delay: Delay,
expire: Instant,
_t: PhantomData<(R, E)>,
}
impl<R, E, F> KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self {
let expire = Instant::from_std(time.now() + ka);
KeepAliveService {
f,
ka,
time,
expire,
delay: delay_until(expire),
_t: PhantomData,
}
}
}
impl<R, E, F> Service for KeepAliveService<R, E, F>
where
F: Fn() -> E,
{
type Request = R;
type Response = R;
type Error = E;
type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let now = Instant::from_std(self.time.now());
if self.expire <= now {
Poll::Ready(Err((self.f)()))
} else {
self.delay.reset(self.expire);
let _ = Pin::new(&mut self.delay).poll(cx);
Poll::Ready(Ok(()))
}
}
Poll::Pending => Poll::Ready(Ok(())),
}
}
fn call(&mut self, req: R) -> Self::Future {
self.expire = Instant::from_std(self.time.now() + self.ka);
ok(req)
}
}

View File

@ -5,16 +5,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
pub mod condition;
pub mod counter;
pub mod dispatcher;
pub mod either;
pub mod inflight;
pub mod keepalive;
pub mod mpsc;
pub mod oneshot;
pub mod order;
pub mod stream;
pub mod task;
pub mod time;
pub mod timeout;

View File

@ -1,15 +1,16 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use std::any::Any;
use std::cell::RefCell;
use core::any::Any;
use core::cell::RefCell;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use futures_core::stream::Stream;
use futures_sink::Sink;
use futures_util::stream::Stream;
use crate::task::LocalWaker;

View File

@ -1,316 +0,0 @@
//! A one-shot, futures-aware channel.
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
pub use futures_channel::oneshot::Canceled;
use slab::Slab;
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
rx_task: LocalWaker::new(),
}));
let tx = Sender {
inner: inner.clone(),
};
let rx = Receiver { inner };
(tx, rx)
}
/// Creates a new futures-aware, pool of one-shot's.
pub fn pool<T>() -> Pool<T> {
Pool(Rc::new(RefCell::new(Slab::new())))
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
#[derive(Debug)]
pub struct Sender<T> {
inner: Rc<RefCell<Inner<T>>>,
}
/// A future representing the completion of a computation happening elsewhere in
/// memory.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
inner: Rc<RefCell<Inner<T>>>,
}
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
rx_task: LocalWaker,
}
impl<T> Sender<T> {
/// Completes this oneshot with a successful result.
///
/// This function will consume `self` and indicate to the other end, the
/// `Receiver`, that the error provided is the result of the computation this
/// represents.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
if Rc::strong_count(&self.inner) == 2 {
let mut inner = self.inner.borrow_mut();
inner.value = Some(val);
inner.rx_task.wake();
Ok(())
} else {
Err(val)
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
Rc::strong_count(&self.inner) == 1
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.borrow().rx_task.wake();
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = this.inner.borrow_mut().value.take() {
return Poll::Ready(Ok(val));
}
// Check if sender is dropped and return error if it is.
if Rc::strong_count(&this.inner) == 1 {
Poll::Ready(Err(Canceled))
} else {
this.inner.borrow().rx_task.register(cx.waker());
Poll::Pending
}
}
}
/// Futures-aware, pool of one-shot's.
pub struct Pool<T>(Rc<RefCell<Slab<PoolInner<T>>>>);
bitflags::bitflags! {
pub struct Flags: u8 {
const SENDER = 0b0000_0001;
const RECEIVER = 0b0000_0010;
}
}
#[derive(Debug)]
struct PoolInner<T> {
flags: Flags,
value: Option<T>,
waker: LocalWaker,
}
impl<T> Pool<T> {
pub fn channel(&mut self) -> (PSender<T>, PReceiver<T>) {
let token = self.0.borrow_mut().insert(PoolInner {
flags: Flags::all(),
value: None,
waker: LocalWaker::default(),
});
(
PSender {
token,
inner: self.0.clone(),
},
PReceiver {
token,
inner: self.0.clone(),
},
)
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Self {
Pool(self.0.clone())
}
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
#[derive(Debug)]
pub struct PSender<T> {
token: usize,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
/// A future representing the completion of a computation happening elsewhere in
/// memory.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct PReceiver<T> {
token: usize,
inner: Rc<RefCell<Slab<PoolInner<T>>>>,
}
// The one-shots do not ever project Pin to the inner T
impl<T> Unpin for PReceiver<T> {}
impl<T> Unpin for PSender<T> {}
impl<T> PSender<T> {
/// Completes this oneshot with a successful result.
///
/// This function will consume `self` and indicate to the other end, the
/// `Receiver`, that the error provided is the result of the computation this
/// represents.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
let mut inner = self.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(self.token) };
if inner.flags.contains(Flags::RECEIVER) {
inner.value = Some(val);
inner.waker.wake();
Ok(())
} else {
Err(val)
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
pub fn is_canceled(&self) -> bool {
!unsafe { self.inner.borrow().get_unchecked(self.token) }
.flags
.contains(Flags::RECEIVER)
}
}
impl<T> Drop for PSender<T> {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::RECEIVER) {
inner_token.waker.wake();
inner_token.flags.remove(Flags::SENDER);
} else {
inner.remove(self.token);
}
}
}
impl<T> Drop for PReceiver<T> {
fn drop(&mut self) {
let mut inner = self.inner.borrow_mut();
let inner_token = unsafe { inner.get_unchecked_mut(self.token) };
if inner_token.flags.contains(Flags::SENDER) {
inner_token.flags.remove(Flags::RECEIVER);
} else {
inner.remove(self.token);
}
}
}
impl<T> Future for PReceiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut inner = this.inner.borrow_mut();
let inner = unsafe { inner.get_unchecked_mut(this.token) };
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.value.take() {
return Poll::Ready(Ok(val));
}
// Check if sender is dropped and return error if it is.
if !inner.flags.contains(Flags::SENDER) {
Poll::Ready(Err(Canceled))
} else {
inner.waker.register(cx.waker());
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
#[actix_rt::test]
async fn test_oneshot() {
let (tx, rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = channel();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
assert!(tx.send("test").is_err());
let (tx, rx) = channel::<&'static str>();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
}
#[actix_rt::test]
async fn test_pool() {
let (tx, rx) = pool().channel();
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = pool().channel();
assert!(!tx.is_canceled());
drop(rx);
assert!(tx.is_canceled());
assert!(tx.send("test").is_err());
let (tx, rx) = pool::<&'static str>().channel();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = pool::<&'static str>().channel();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = pool::<&'static str>().channel();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
}
}

View File

@ -1,283 +0,0 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
use crate::oneshot;
use crate::task::LocalWaker;
struct Record<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
tx: oneshot::Sender<Result<I, E>>,
}
/// Timeout error
pub enum InOrderError<E> {
/// Service error
Service(E),
/// Service call dropped
Disconnected,
}
impl<E> From<E> for InOrderError<E> {
fn from(err: E) -> Self {
InOrderError::Service(err)
}
}
impl<E: fmt::Debug> fmt::Debug for InOrderError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InOrderError::Service(e) => write!(f, "InOrderError::Service({:?})", e),
InOrderError::Disconnected => write!(f, "InOrderError::Disconnected"),
}
}
}
impl<E: fmt::Display> fmt::Display for InOrderError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InOrderError::Service(e) => e.fmt(f),
InOrderError::Disconnected => write!(f, "InOrder service disconnected"),
}
}
}
/// InOrder - The service will yield responses as they become available,
/// in the order that their originating requests were submitted to the service.
pub struct InOrder<S> {
_t: PhantomData<S>,
}
impl<S> InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
pub fn new() -> Self {
Self { _t: PhantomData }
}
pub fn service(service: S) -> InOrderService<S> {
InOrderService::new(service)
}
}
impl<S> Default for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
fn default() -> Self {
Self::new()
}
}
impl<S> Transform<S> for InOrder<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type InitError = Infallible;
type Transform = InOrderService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InOrderService::new(service))
}
}
pub struct InOrderService<S: Service> {
service: S,
waker: Rc<LocalWaker>,
acks: VecDeque<Record<S::Response, S::Error>>,
}
impl<S> InOrderService<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
pub fn new<U>(service: U) -> Self
where
U: IntoService<S>,
{
Self {
service: service.into_service(),
acks: VecDeque::new(),
waker: Rc::new(LocalWaker::new()),
}
}
}
impl<S> Service for InOrderService<S>
where
S: Service,
S::Response: 'static,
S::Future: 'static,
S::Error: 'static,
{
type Request = S::Request;
type Response = S::Response;
type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// poll_ready could be called from different task
self.waker.register(cx.waker());
// check acks
while !self.acks.is_empty() {
let rec = self.acks.front_mut().unwrap();
match Pin::new(&mut rec.rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let rec = self.acks.pop_front().unwrap();
let _ = rec.tx.send(res);
}
Poll::Pending => break,
Poll::Ready(Err(oneshot::Canceled)) => {
return Poll::Ready(Err(InOrderError::Disconnected))
}
}
}
// check nested service
if self
.service
.poll_ready(cx)
.map_err(InOrderError::Service)?
.is_pending()
{
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, request: S::Request) -> Self::Future {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
self.acks.push_back(Record { rx: rx1, tx: tx2 });
let waker = self.waker.clone();
let fut = self.service.call(request);
actix_rt::spawn(async move {
let res = fut.await;
waker.wake();
let _ = tx1.send(res);
});
InOrderServiceResponse { rx: rx2 }
}
}
#[doc(hidden)]
pub struct InOrderServiceResponse<S: Service> {
rx: oneshot::Receiver<Result<S::Response, S::Error>>,
}
impl<S: Service> Future for InOrderServiceResponse<S> {
type Output = Result<S::Response, InOrderError<S::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)),
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())),
Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)),
}
}
}
#[cfg(test)]
mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::Service;
use futures_channel::oneshot;
use futures_util::future::{lazy, poll_fn, FutureExt, LocalBoxFuture};
struct Srv;
impl Service for Srv {
type Request = oneshot::Receiver<usize>;
type Response = usize;
type Error = ();
type Future = LocalBoxFuture<'static, Result<usize, ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
req.map(|res| res.map_err(|_| ())).boxed_local()
}
}
#[actix_rt::test]
async fn test_in_order() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let (tx_stop, rx_stop) = oneshot::channel();
let h = std::thread::spawn(move || {
let rx1 = rx1;
let rx2 = rx2;
let rx3 = rx3;
let tx_stop = tx_stop;
actix_rt::System::new("test").block_on(async {
let mut srv = InOrderService::new(Srv);
let _ = lazy(|cx| srv.poll_ready(cx)).await;
let res1 = srv.call(rx1);
let res2 = srv.call(rx2);
let res3 = srv.call(rx3);
actix_rt::spawn(async move {
poll_fn(|cx| {
let _ = srv.poll_ready(cx);
Poll::<()>::Pending
})
.await;
});
assert_eq!(res1.await.unwrap(), 1);
assert_eq!(res2.await.unwrap(), 2);
assert_eq!(res3.await.unwrap(), 3);
let _ = tx_stop.send(());
actix_rt::System::current().stop();
});
});
let _ = tx3.send(3);
std::thread::sleep(Duration::from_millis(50));
let _ = tx2.send(2);
let _ = tx1.send(1);
let _ = rx_stop.await;
let _ = h.join();
}
}

View File

@ -1,76 +0,0 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service};
use futures_util::{stream::Stream, FutureExt};
use crate::mpsc;
#[pin_project::pin_project]
pub struct Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
#[pin]
stream: S,
service: T,
err_rx: mpsc::Receiver<T::Error>,
err_tx: mpsc::Sender<T::Error>,
}
impl<S, T> Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
pub fn new<F>(stream: S, service: F) -> Self
where
F: IntoService<T>,
{
let (err_tx, err_rx) = mpsc::channel();
Dispatcher {
err_rx,
err_tx,
stream,
service: service.into_service(),
}
}
}
impl<S, T> Future for Dispatcher<S, T>
where
S: Stream,
T: Service<Request = S::Item, Response = ()> + 'static,
{
type Output = Result<(), T::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) {
return Poll::Ready(Err(e));
}
loop {
return match this.service.poll_ready(cx)? {
Poll::Ready(_) => match this.stream.poll_next(cx) {
Poll::Ready(Some(item)) => {
let stop = this.err_tx.clone();
actix_rt::spawn(this.service.call(item).map(move |res| {
if let Err(e) = res {
let _ = stop.send(e);
}
}));
this = self.as_mut().project();
continue;
}
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(Ok(())),
},
Poll::Pending => Poll::Pending,
};
}
}
}

View File

@ -1,7 +1,7 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::task::Waker;
use std::{fmt, rc};
use core::cell::UnsafeCell;
use core::fmt;
use core::marker::PhantomData;
use core::task::Waker;
/// A synchronization primitive for task wakeup.
///
@ -23,7 +23,8 @@ use std::{fmt, rc};
#[derive(Default)]
pub struct LocalWaker {
pub(crate) waker: UnsafeCell<Option<Waker>>,
_t: PhantomData<rc::Rc<()>>,
// mark LocalWaker as a !Send type.
_t: PhantomData<*const ()>,
}
impl LocalWaker {

View File

@ -1,225 +0,0 @@
use std::cell::RefCell;
use std::convert::Infallible;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use actix_rt::time::delay_for;
use actix_service::{Service, ServiceFactory};
use futures_util::future::{ok, ready, FutureExt, Ready};
#[derive(Clone, Debug)]
pub struct LowResTime(Rc<RefCell<Inner>>);
#[derive(Debug)]
struct Inner {
resolution: Duration,
current: Option<Instant>,
}
impl Inner {
fn new(resolution: Duration) -> Self {
Inner {
resolution,
current: None,
}
}
}
impl LowResTime {
pub fn with(resolution: Duration) -> LowResTime {
LowResTime(Rc::new(RefCell::new(Inner::new(resolution))))
}
pub fn timer(&self) -> LowResTimeService {
LowResTimeService(self.0.clone())
}
}
impl Default for LowResTime {
fn default() -> Self {
LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1)))))
}
}
impl ServiceFactory for LowResTime {
type Request = ();
type Response = Instant;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
ok(self.timer())
}
}
#[derive(Clone, Debug)]
pub struct LowResTimeService(Rc<RefCell<Inner>>);
impl LowResTimeService {
pub fn with(resolution: Duration) -> LowResTimeService {
LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> Instant {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = Instant::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
now
}
}
}
impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(self.now())
}
}
#[derive(Clone, Debug)]
pub struct SystemTime(Rc<RefCell<SystemTimeInner>>);
#[derive(Debug)]
struct SystemTimeInner {
resolution: Duration,
current: Option<time::SystemTime>,
}
impl SystemTimeInner {
fn new(resolution: Duration) -> Self {
SystemTimeInner {
resolution,
current: None,
}
}
}
#[derive(Clone, Debug)]
pub struct SystemTimeService(Rc<RefCell<SystemTimeInner>>);
impl SystemTimeService {
pub fn with(resolution: Duration) -> SystemTimeService {
SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution))))
}
/// Get current time. This function has to be called from
/// future's poll method, otherwise it panics.
pub fn now(&self) -> time::SystemTime {
let cur = self.0.borrow().current;
if let Some(cur) = cur {
cur
} else {
let now = time::SystemTime::now();
let inner = self.0.clone();
let interval = {
let mut b = inner.borrow_mut();
b.current = Some(now);
b.resolution
};
actix_rt::spawn(delay_for(interval).then(move |_| {
inner.borrow_mut().current.take();
ready(())
}));
now
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, SystemTime};
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `SystemTimeService::now()` return the same value.
#[actix_rt::test]
async fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = SystemTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
///
/// Expected Behavior: Two back-to-back calls of `LowResTimeService::now()` return the same value.
#[actix_rt::test]
async fn low_res_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
}
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn system_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = SystemTimeService::with(resolution);
let first_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
delay_for(wait_time).await;
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
assert!(second_time - first_time >= wait_time);
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
///
/// Expected Behavior: Two calls of `LowResTimeService::now()` made in subsequent resolution interval return different values
/// and second value is greater than the first one at least by a resolution interval.
#[actix_rt::test]
async fn low_res_time_service_time_updates_after_resolution_interval() {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(300);
let time_service = LowResTimeService::with(resolution);
let first_time = time_service.now();
delay_for(wait_time).await;
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
}
}

View File

@ -2,15 +2,14 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, time};
use core::future::Future;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::{fmt, time};
use actix_rt::time::{delay_for, Delay};
use actix_service::{IntoService, Service, Transform};
use futures_util::future::{ok, Ready};
/// Applies a timeout to requests.
#[derive(Debug)]
@ -85,15 +84,35 @@ where
type Request = S::Request;
type Response = S::Response;
type Error = TimeoutError<S::Error>;
type InitError = E;
type Transform = TimeoutService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
type InitError = E;
type Future = TimeoutFuture<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future {
ok(TimeoutService {
let service = TimeoutService {
service,
timeout: self.timeout,
})
};
TimeoutFuture {
service: Some(service),
_err: PhantomData,
}
}
}
pub struct TimeoutFuture<T, E> {
service: Option<T>,
_err: PhantomData<E>,
}
impl<T, E> Unpin for TimeoutFuture<T, E> {}
impl<T, E> Future for TimeoutFuture<T, E> {
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(self.get_mut().service.take().unwrap()))
}
}
@ -140,13 +159,14 @@ where
}
}
/// `TimeoutService` response future
#[pin_project::pin_project]
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
#[pin]
fut: T::Future,
sleep: Delay,
pin_project_lite::pin_project! {
/// `TimeoutService` response future
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
#[pin]
fut: T::Future,
sleep: Delay,
}
}
impl<T> Future for TimeoutServiceResponse<T>
@ -156,20 +176,20 @@ where
type Output = Result<T::Response, TimeoutError<T::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let this = self.project();
// First, try polling the future
match this.fut.poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))),
Poll::Pending => {}
if let Poll::Ready(res) = this.fut.poll(cx) {
return match res {
Ok(v) => Poll::Ready(Ok(v)),
Err(e) => Poll::Ready(Err(TimeoutError::Service(e))),
};
}
// Now check the sleep
match Pin::new(&mut this.sleep).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
}
Pin::new(this.sleep)
.poll(cx)
.map(|_| Err(TimeoutError::Timeout))
}
}