diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 0c036f67..b75c03d3 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,7 +1,9 @@ # Changes ## Unreleased - 2020-xx-xx + * Upgrade `tokio-util` to `0.3`. +* Remove unsound custom Cell and use `std::cell::RefCell` instead, as well as `actix-service`. ## [1.0.6] - 2020-01-08 diff --git a/actix-utils/src/cell.rs b/actix-utils/src/cell.rs deleted file mode 100644 index ee35125e..00000000 --- a/actix-utils/src/cell.rs +++ /dev/null @@ -1,48 +0,0 @@ -//! Custom cell impl - -use std::cell::UnsafeCell; -use std::fmt; -use std::rc::Rc; - -pub(crate) struct Cell { - pub(crate) inner: Rc>, -} - -impl Clone for Cell { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl fmt::Debug for Cell { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.inner.fmt(f) - } -} - -impl Cell { - pub(crate) fn new(inner: T) -> Self { - Self { - inner: Rc::new(UnsafeCell::new(inner)), - } - } - - pub(crate) fn strong_count(&self) -> usize { - Rc::strong_count(&self.inner) - } - - pub(crate) fn get_ref(&self) -> &T { - unsafe { &*self.inner.as_ref().get() } - } - - pub(crate) fn get_mut(&mut self) -> &mut T { - unsafe { &mut *self.inner.as_ref().get() } - } - - #[allow(clippy::mut_from_ref)] - pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { - &mut *self.inner.as_ref().get() - } -} diff --git a/actix-utils/src/condition.rs b/actix-utils/src/condition.rs index fe459cf7..9c7c977c 100644 --- a/actix-utils/src/condition.rs +++ b/actix-utils/src/condition.rs @@ -1,14 +1,15 @@ +use std::cell::RefCell; use std::future::Future; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; use slab::Slab; -use crate::cell::Cell; use crate::task::LocalWaker; /// Condition allows to notify multiple receivers at the same time -pub struct Condition(Cell); +pub struct Condition(Rc>); struct Inner { data: Slab>, @@ -22,12 +23,12 @@ impl Default for Condition { impl Condition { pub fn new() -> Condition { - Condition(Cell::new(Inner { data: Slab::new() })) + Condition(Rc::new(RefCell::new(Inner { data: Slab::new() }))) } /// Get condition waiter pub fn wait(&mut self) -> Waiter { - let token = self.0.get_mut().data.insert(None); + let token = self.0.borrow_mut().data.insert(None); Waiter { token, inner: self.0.clone(), @@ -36,7 +37,7 @@ impl Condition { /// Notify all waiters pub fn notify(&self) { - let inner = self.0.get_ref(); + let inner = self.0.borrow(); for item in inner.data.iter() { if let Some(waker) = item.1 { waker.wake(); @@ -54,12 +55,12 @@ impl Drop for Condition { #[must_use = "Waiter do nothing unless polled"] pub struct Waiter { token: usize, - inner: Cell, + inner: Rc>, } impl Clone for Waiter { fn clone(&self) -> Self { - let token = unsafe { self.inner.get_mut_unsafe() }.data.insert(None); + let token = self.inner.borrow_mut().data.insert(None); Waiter { token, inner: self.inner.clone(), @@ -73,7 +74,8 @@ impl Future for Waiter { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - let inner = unsafe { this.inner.get_mut().data.get_unchecked_mut(this.token) }; + let mut inner = this.inner.borrow_mut(); + let inner = unsafe { inner.data.get_unchecked_mut(this.token) }; if inner.is_none() { let waker = LocalWaker::default(); waker.register(cx.waker()); @@ -89,7 +91,7 @@ impl Future for Waiter { impl Drop for Waiter { fn drop(&mut self) { - self.inner.get_mut().data.remove(self.token); + self.inner.borrow_mut().data.remove(self.token); } } diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs index 53d6e86e..fdf15ffe 100644 --- a/actix-utils/src/either.rs +++ b/actix-utils/src/either.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; -use futures_util::{future, ready, future::Future}; +use futures_util::{future, future::Future, ready}; /// Combine two different service types into a single type. /// diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 3c66accc..be013729 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -2,7 +2,6 @@ #![deny(rust_2018_idioms)] #![allow(clippy::type_complexity)] -mod cell; pub mod condition; pub mod counter; pub mod either; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index 531c9684..5905e123 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,24 +1,25 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. use std::any::Any; +use std::cell::RefCell; use std::collections::VecDeque; use std::error::Error; use std::fmt; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; use futures_sink::Sink; use futures_util::stream::Stream; -use crate::cell::Cell; use crate::task::LocalWaker; /// Creates a unbounded in-memory channel with buffered storage. pub fn channel() -> (Sender, Receiver) { - let shared = Cell::new(Shared { + let shared = Rc::new(RefCell::new(Shared { has_receiver: true, buffer: VecDeque::new(), blocked_recv: LocalWaker::new(), - }); + })); let sender = Sender { shared: shared.clone(), }; @@ -38,7 +39,7 @@ struct Shared { /// This is created by the `channel` function. #[derive(Debug)] pub struct Sender { - shared: Cell>, + shared: Rc>>, } impl Unpin for Sender {} @@ -46,7 +47,7 @@ impl Unpin for Sender {} impl Sender { /// Sends the provided message along this channel. pub fn send(&self, item: T) -> Result<(), SendError> { - let shared = unsafe { self.shared.get_mut_unsafe() }; + let mut shared = self.shared.borrow_mut(); if !shared.has_receiver { return Err(SendError(item)); // receiver was dropped }; @@ -60,7 +61,7 @@ impl Sender { /// This prevents any further messages from being sent on the channel while /// still enabling the receiver to drain messages that are buffered. pub fn close(&mut self) { - self.shared.get_mut().has_receiver = false; + self.shared.borrow_mut().has_receiver = false; } } @@ -94,8 +95,8 @@ impl Sink for Sender { impl Drop for Sender { fn drop(&mut self) { - let count = self.shared.strong_count(); - let shared = self.shared.get_mut(); + let count = Rc::strong_count(&self.shared); + let shared = self.shared.borrow_mut(); // check is last sender is about to drop if shared.has_receiver && count == 2 { @@ -110,7 +111,7 @@ impl Drop for Sender { /// This is created by the `channel` function. #[derive(Debug)] pub struct Receiver { - shared: Cell>, + shared: Rc>>, } impl Receiver { @@ -127,15 +128,16 @@ impl Unpin for Receiver {} impl Stream for Receiver { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.shared.strong_count() == 1 { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut shared = self.shared.borrow_mut(); + if Rc::strong_count(&self.shared) == 1 { // All senders have been dropped, so drain the buffer and end the // stream. - Poll::Ready(self.shared.get_mut().buffer.pop_front()) - } else if let Some(msg) = self.shared.get_mut().buffer.pop_front() { + Poll::Ready(shared.buffer.pop_front()) + } else if let Some(msg) = shared.buffer.pop_front() { Poll::Ready(Some(msg)) } else { - self.shared.get_mut().blocked_recv.register(cx.waker()); + shared.blocked_recv.register(cx.waker()); Poll::Pending } } @@ -143,7 +145,7 @@ impl Stream for Receiver { impl Drop for Receiver { fn drop(&mut self) { - let shared = self.shared.get_mut(); + let mut shared = self.shared.borrow_mut(); shared.buffer.clear(); shared.has_receiver = false; } diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs index 9945e5d3..16f2c4b4 100644 --- a/actix-utils/src/oneshot.rs +++ b/actix-utils/src/oneshot.rs @@ -1,20 +1,21 @@ //! A one-shot, futures-aware channel. +use std::cell::RefCell; use std::future::Future; use std::pin::Pin; +use std::rc::Rc; use std::task::{Context, Poll}; pub use futures_channel::oneshot::Canceled; use slab::Slab; -use crate::cell::Cell; use crate::task::LocalWaker; /// Creates a new futures-aware, one-shot channel. pub fn channel() -> (Sender, Receiver) { - let inner = Cell::new(Inner { + let inner = Rc::new(RefCell::new(Inner { value: None, rx_task: LocalWaker::new(), - }); + })); let tx = Sender { inner: inner.clone(), }; @@ -24,14 +25,14 @@ pub fn channel() -> (Sender, Receiver) { /// Creates a new futures-aware, pool of one-shot's. pub fn pool() -> Pool { - Pool(Cell::new(Slab::new())) + Pool(Rc::new(RefCell::new(Slab::new()))) } /// Represents the completion half of a oneshot through which the result of a /// computation is signaled. #[derive(Debug)] pub struct Sender { - inner: Cell>, + inner: Rc>>, } /// A future representing the completion of a computation happening elsewhere in @@ -39,7 +40,7 @@ pub struct Sender { #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct Receiver { - inner: Cell>, + inner: Rc>>, } // The channels do not ever project Pin to the inner T @@ -63,9 +64,9 @@ impl Sender { /// then `Ok(())` is returned. If the receiving end was dropped before /// this function was called, however, then `Err` is returned with the value /// provided. - pub fn send(mut self, val: T) -> Result<(), T> { - if self.inner.strong_count() == 2 { - let inner = self.inner.get_mut(); + pub fn send(self, val: T) -> Result<(), T> { + if Rc::strong_count(&self.inner) == 2 { + let mut inner = self.inner.borrow_mut(); inner.value = Some(val); inner.rx_task.wake(); Ok(()) @@ -77,13 +78,13 @@ impl Sender { /// Tests to see whether this `Sender`'s corresponding `Receiver` /// has gone away. pub fn is_canceled(&self) -> bool { - self.inner.strong_count() == 1 + Rc::strong_count(&self.inner) == 1 } } impl Drop for Sender { fn drop(&mut self) { - self.inner.get_ref().rx_task.wake(); + self.inner.borrow().rx_task.wake(); } } @@ -94,22 +95,22 @@ impl Future for Receiver { let this = self.get_mut(); // If we've got a value, then skip the logic below as we're done. - if let Some(val) = this.inner.get_mut().value.take() { + if let Some(val) = this.inner.borrow_mut().value.take() { return Poll::Ready(Ok(val)); } // Check if sender is dropped and return error if it is. - if this.inner.strong_count() == 1 { + if Rc::strong_count(&this.inner) == 1 { Poll::Ready(Err(Canceled)) } else { - this.inner.get_ref().rx_task.register(cx.waker()); + this.inner.borrow().rx_task.register(cx.waker()); Poll::Pending } } } /// Futures-aware, pool of one-shot's. -pub struct Pool(Cell>>); +pub struct Pool(Rc>>>); bitflags::bitflags! { pub struct Flags: u8 { @@ -127,7 +128,7 @@ struct PoolInner { impl Pool { pub fn channel(&mut self) -> (PSender, PReceiver) { - let token = self.0.get_mut().insert(PoolInner { + let token = self.0.borrow_mut().insert(PoolInner { flags: Flags::all(), value: None, waker: LocalWaker::default(), @@ -157,7 +158,7 @@ impl Clone for Pool { #[derive(Debug)] pub struct PSender { token: usize, - inner: Cell>>, + inner: Rc>>>, } /// A future representing the completion of a computation happening elsewhere in @@ -166,7 +167,7 @@ pub struct PSender { #[must_use = "futures do nothing unless polled"] pub struct PReceiver { token: usize, - inner: Cell>>, + inner: Rc>>>, } // The oneshots do not ever project Pin to the inner T @@ -184,8 +185,9 @@ impl PSender { /// then `Ok(())` is returned. If the receiving end was dropped before /// this function was called, however, then `Err` is returned with the value /// provided. - pub fn send(mut self, val: T) -> Result<(), T> { - let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) }; + pub fn send(self, val: T) -> Result<(), T> { + let mut inner = self.inner.borrow_mut(); + let inner = unsafe { inner.get_unchecked_mut(self.token) }; if inner.flags.contains(Flags::RECEIVER) { inner.value = Some(val); @@ -199,7 +201,7 @@ impl PSender { /// Tests to see whether this `Sender`'s corresponding `Receiver` /// has gone away. pub fn is_canceled(&self) -> bool { - !unsafe { self.inner.get_ref().get_unchecked(self.token) } + !unsafe { self.inner.borrow().get_unchecked(self.token) } .flags .contains(Flags::RECEIVER) } @@ -207,23 +209,25 @@ impl PSender { impl Drop for PSender { fn drop(&mut self) { - let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) }; - if inner.flags.contains(Flags::RECEIVER) { - inner.waker.wake(); - inner.flags.remove(Flags::SENDER); + let mut inner = self.inner.borrow_mut(); + let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; + if inner_token.flags.contains(Flags::RECEIVER) { + inner_token.waker.wake(); + inner_token.flags.remove(Flags::SENDER); } else { - self.inner.get_mut().remove(self.token); + inner.remove(self.token); } } } impl Drop for PReceiver { fn drop(&mut self) { - let inner = unsafe { self.inner.get_mut().get_unchecked_mut(self.token) }; - if inner.flags.contains(Flags::SENDER) { - inner.flags.remove(Flags::RECEIVER); + let mut inner = self.inner.borrow_mut(); + let inner_token = unsafe { inner.get_unchecked_mut(self.token) }; + if inner_token.flags.contains(Flags::SENDER) { + inner_token.flags.remove(Flags::RECEIVER); } else { - self.inner.get_mut().remove(self.token); + inner.remove(self.token); } } } @@ -233,7 +237,8 @@ impl Future for PReceiver { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - let inner = unsafe { this.inner.get_mut().get_unchecked_mut(this.token) }; + let mut inner = this.inner.borrow_mut(); + let inner = unsafe { inner.get_unchecked_mut(this.token) }; // If we've got a value, then skip the logic below as we're done. if let Some(val) = inner.value.take() { diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs index 90df8c2f..72e9e019 100644 --- a/actix-utils/src/stream.rs +++ b/actix-utils/src/stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_service::{IntoService, Service}; -use futures_util::{FutureExt, stream::Stream}; +use futures_util::{stream::Stream, FutureExt}; use crate::mpsc; diff --git a/actix-utils/src/time.rs b/actix-utils/src/time.rs index c7bfa922..02a56607 100644 --- a/actix-utils/src/time.rs +++ b/actix-utils/src/time.rs @@ -1,4 +1,6 @@ +use std::cell::RefCell; use std::convert::Infallible; +use std::rc::Rc; use std::task::{Context, Poll}; use std::time::{self, Duration, Instant}; @@ -6,10 +8,8 @@ use actix_rt::time::delay_for; use actix_service::{Service, ServiceFactory}; use futures_util::future::{ok, ready, FutureExt, Ready}; -use super::cell::Cell; - #[derive(Clone, Debug)] -pub struct LowResTime(Cell); +pub struct LowResTime(Rc>); #[derive(Debug)] struct Inner { @@ -28,7 +28,7 @@ impl Inner { impl LowResTime { pub fn with(resolution: Duration) -> LowResTime { - LowResTime(Cell::new(Inner::new(resolution))) + LowResTime(Rc::new(RefCell::new(Inner::new(resolution)))) } pub fn timer(&self) -> LowResTimeService { @@ -38,7 +38,7 @@ impl LowResTime { impl Default for LowResTime { fn default() -> Self { - LowResTime(Cell::new(Inner::new(Duration::from_secs(1)))) + LowResTime(Rc::new(RefCell::new(Inner::new(Duration::from_secs(1))))) } } @@ -57,30 +57,30 @@ impl ServiceFactory for LowResTime { } #[derive(Clone, Debug)] -pub struct LowResTimeService(Cell); +pub struct LowResTimeService(Rc>); impl LowResTimeService { pub fn with(resolution: Duration) -> LowResTimeService { - LowResTimeService(Cell::new(Inner::new(resolution))) + LowResTimeService(Rc::new(RefCell::new(Inner::new(resolution)))) } /// Get current time. This function has to be called from /// future's poll method, otherwise it panics. pub fn now(&self) -> Instant { - let cur = self.0.get_ref().current; + let cur = self.0.borrow().current; if let Some(cur) = cur { cur } else { let now = Instant::now(); - let mut inner = self.0.clone(); + let inner = self.0.clone(); let interval = { - let mut b = inner.get_mut(); + let mut b = inner.borrow_mut(); b.current = Some(now); b.resolution }; actix_rt::spawn(delay_for(interval).then(move |_| { - inner.get_mut().current.take(); + inner.borrow_mut().current.take(); ready(()) })); now @@ -104,7 +104,7 @@ impl Service for LowResTimeService { } #[derive(Clone, Debug)] -pub struct SystemTime(Cell); +pub struct SystemTime(Rc>); #[derive(Debug)] struct SystemTimeInner { @@ -122,30 +122,30 @@ impl SystemTimeInner { } #[derive(Clone, Debug)] -pub struct SystemTimeService(Cell); +pub struct SystemTimeService(Rc>); impl SystemTimeService { pub fn with(resolution: Duration) -> SystemTimeService { - SystemTimeService(Cell::new(SystemTimeInner::new(resolution))) + SystemTimeService(Rc::new(RefCell::new(SystemTimeInner::new(resolution)))) } /// Get current time. This function has to be called from /// future's poll method, otherwise it panics. pub fn now(&self) -> time::SystemTime { - let cur = self.0.get_ref().current; + let cur = self.0.borrow().current; if let Some(cur) = cur { cur } else { let now = time::SystemTime::now(); - let mut inner = self.0.clone(); + let inner = self.0.clone(); let interval = { - let mut b = inner.get_mut(); + let mut b = inner.borrow_mut(); b.current = Some(now); b.resolution }; actix_rt::spawn(delay_for(interval).then(move |_| { - inner.get_mut().current.take(); + inner.borrow_mut().current.take(); ready(()) })); now