From 2e5e69c9ba64f145c6f071f4d0326de4d30b94da Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 11 Dec 2019 11:28:09 +0600 Subject: [PATCH] Simplify oneshot and mpsc implementations --- actix-connect/Cargo.toml | 2 +- actix-ioframe/Cargo.toml | 2 +- actix-macros/Cargo.toml | 2 +- actix-server/Cargo.toml | 2 +- actix-service/Cargo.toml | 2 +- actix-testing/Cargo.toml | 2 +- actix-tls/Cargo.toml | 2 +- actix-utils/CHANGES.md | 4 ++ actix-utils/Cargo.toml | 5 +- actix-utils/src/cell.rs | 34 +++------- actix-utils/src/mpsc.rs | 134 ++++++++++++++++++------------------- actix-utils/src/oneshot.rs | 132 ++++++++++++------------------------ 12 files changed, 129 insertions(+), 194 deletions(-) diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 96bc7be0..7709b5ee 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -36,7 +36,7 @@ uri = ["http"] actix-service = "1.0.0" actix-codec = "0.2.0" actix-utils = "1.0.0-alpha.3" -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" derive_more = "0.99.2" either = "1.5.2" futures = "0.3.1" diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 9f4486b0..f69ffa68 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -21,7 +21,7 @@ path = "src/lib.rs" actix-service = "1.0.0" actix-codec = "0.2.0" actix-utils = "1.0.0-alpha.2" -actix-rt = "1.0.0-alpha.2" +actix-rt = "1.0.0" bytes = "0.5" either = "1.5.2" futures = "0.3.1" diff --git a/actix-macros/Cargo.toml b/actix-macros/Cargo.toml index 59af1a92..f0ba02db 100644 --- a/actix-macros/Cargo.toml +++ b/actix-macros/Cargo.toml @@ -18,4 +18,4 @@ quote = "^1" syn = { version = "^1", features = ["full"] } [dev-dependencies] -actix-rt = { version = "1.0.0-alpha.3" } +actix-rt = { version = "1.0.0" } diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 3d974e3d..f1373cf1 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -22,7 +22,7 @@ default = [] [dependencies] actix-service = "1.0.0" -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" actix-codec = "0.2.0" actix-utils = "1.0.0-alpha.3" diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index dfadc81e..31926c58 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -26,4 +26,4 @@ futures-util = "0.3.1" pin-project = "0.4.6" [dev-dependencies] -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 35c16128..d527f6b7 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -17,7 +17,7 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" actix-macros = "0.1.0" actix-server = "1.0.0-alpha.3" actix-service = "1.0.0" diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index a8846e5d..2c901a01 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -35,7 +35,7 @@ nativetls = ["native-tls", "tokio-tls"] actix-service = "1.0.0" actix-codec = "0.2.0" actix-utils = "1.0.0-alpha.3" -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" derive_more = "0.99.2" either = "1.5.2" futures = "0.3.1" diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index a91cf9f1..0aece6c4 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [1.0.0] - 2019-12-11 + +* Simplify oneshot and mpsc implementations + ## [1.0.0-alpha.3] - 2019-12-07 * Migrate to tokio 0.2 diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index c94e133d..7cf69bef 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "1.0.0-alpha.3" +version = "1.0.0" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -9,7 +9,6 @@ repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/actix-utils/" categories = ["network-programming", "asynchronous"] license = "MIT/Apache-2.0" -exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] edition = "2018" workspace = ".." @@ -19,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-service = "1.0.0" -actix-rt = "1.0.0-alpha.3" +actix-rt = "1.0.0" actix-codec = "0.2.0" bytes = "0.5.2" either = "1.5.2" diff --git a/actix-utils/src/cell.rs b/actix-utils/src/cell.rs index e85d81d3..716982ae 100644 --- a/actix-utils/src/cell.rs +++ b/actix-utils/src/cell.rs @@ -2,16 +2,12 @@ use std::cell::UnsafeCell; use std::fmt; -use std::rc::{Rc, Weak}; +use std::rc::Rc; pub(crate) struct Cell { pub(crate) inner: Rc>, } -pub(crate) struct WeakCell { - inner: Weak>, -} - impl Clone for Cell { fn clone(&self) -> Self { Self { @@ -27,39 +23,25 @@ impl fmt::Debug for Cell { } impl Cell { - pub fn new(inner: T) -> Self { + pub(crate) fn new(inner: T) -> Self { Self { inner: Rc::new(UnsafeCell::new(inner)), } } - pub fn downgrade(&self) -> WeakCell { - WeakCell { - inner: Rc::downgrade(&self.inner), - } + pub(crate) fn strong_count(&self) -> usize { + Rc::strong_count(&self.inner) } - pub fn get_ref(&self) -> &T { + pub(crate) fn get_ref(&self) -> &T { unsafe { &*self.inner.as_ref().get() } } - pub fn get_mut(&mut self) -> &mut T { + pub(crate) fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.inner.as_ref().get() } } -} -impl WeakCell { - pub fn upgrade(&self) -> Option> { - if let Some(inner) = self.inner.upgrade() { - Some(Cell { inner }) - } else { - None - } - } -} - -impl fmt::Debug for WeakCell { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.inner.fmt(f) + pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { + &mut *self.inner.as_ref().get() } } diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index d6e41135..d9882584 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,34 +1,27 @@ -//! A multi-producer, single-consumer, futures-aware, FIFO queue with back -//! pressure, for use communicating between tasks on the same thread. -//! -//! These queues are the same as those in `futures::sync`, except they're not -//! intended to be sent across threads. - +//! A multi-producer, single-consumer, futures-aware, FIFO queue. use std::any::Any; -use std::cell::RefCell; use std::collections::VecDeque; use std::error::Error; +use std::fmt; use std::pin::Pin; -use std::rc::{Rc, Weak}; use std::task::{Context, Poll}; -use std::{fmt, mem}; use futures::{Sink, Stream}; +use crate::cell::Cell; use crate::task::LocalWaker; /// Creates a unbounded in-memory channel with buffered storage. pub fn channel() -> (Sender, Receiver) { - let shared = Rc::new(RefCell::new(Shared { + let shared = Cell::new(Shared { + has_receiver: true, buffer: VecDeque::new(), blocked_recv: LocalWaker::new(), - })); + }); let sender = Sender { - shared: Rc::downgrade(&shared), - }; - let receiver = Receiver { - state: State::Open(shared), + shared: shared.clone(), }; + let receiver = Receiver { shared }; (sender, receiver) } @@ -36,6 +29,7 @@ pub fn channel() -> (Sender, Receiver) { struct Shared { buffer: VecDeque, blocked_recv: LocalWaker, + has_receiver: bool, } /// The transmission end of a channel. @@ -43,18 +37,18 @@ struct Shared { /// This is created by the `channel` function. #[derive(Debug)] pub struct Sender { - shared: Weak>>, + shared: Cell>, } +impl Unpin for Sender {} + impl Sender { /// Sends the provided message along this channel. pub fn send(&self, item: T) -> Result<(), SendError> { - let shared = match self.shared.upgrade() { - Some(shared) => shared, - None => return Err(SendError(item)), // receiver was dropped + let shared = unsafe { self.shared.get_mut_unsafe() }; + if !shared.has_receiver { + return Err(SendError(item)); // receiver was dropped }; - let mut shared = shared.borrow_mut(); - shared.buffer.push_back(item); shared.blocked_recv.wake(); Ok(()) @@ -91,17 +85,13 @@ impl Sink for Sender { impl Drop for Sender { fn drop(&mut self) { - let shared = match self.shared.upgrade() { - Some(shared) => shared, - None => return, - }; - // The number of existing `Weak` indicates if we are possibly the last - // `Sender`. If we are the last, we possibly must notify a blocked - // `Receiver`. `self.shared` is always one of the `Weak` to this shared - // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. - if Rc::weak_count(&shared) == 1 { + let count = self.shared.strong_count(); + let shared = self.shared.get_mut(); + + // check is last sender is about to drop + if shared.has_receiver && count == 2 { // Wake up receiver as its stream has ended - shared.borrow_mut().blocked_recv.wake(); + shared.blocked_recv.wake(); } } } @@ -111,56 +101,23 @@ impl Drop for Sender { /// This is created by the `channel` function. #[derive(Debug)] pub struct Receiver { - state: State, + shared: Cell>, } impl Unpin for Receiver {} -/// Possible states of a receiver. We're either Open (can receive more messages) -/// or we're closed with a list of messages we have left to receive. -#[derive(Debug)] -enum State { - Open(Rc>>), - Closed(VecDeque), -} - -impl Receiver { - /// Closes the receiving half - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - let items = match self.state { - State::Open(ref state) => { - let mut state = state.borrow_mut(); - mem::replace(&mut state.buffer, VecDeque::new()) - } - State::Closed(_) => return, - }; - self.state = State::Closed(items); - } -} - impl Stream for Receiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = match self.state { - State::Open(ref mut me) => me, - State::Closed(ref mut items) => return Poll::Ready(items.pop_front()), - }; - - if let Some(shared) = Rc::get_mut(me) { + if self.shared.strong_count() == 1 { // All senders have been dropped, so drain the buffer and end the // stream. - return Poll::Ready(shared.borrow_mut().buffer.pop_front()); - } - - let mut shared = me.borrow_mut(); - if let Some(msg) = shared.buffer.pop_front() { + Poll::Ready(self.shared.get_mut().buffer.pop_front()) + } else if let Some(msg) = self.shared.get_mut().buffer.pop_front() { Poll::Ready(Some(msg)) } else { - shared.blocked_recv.register(cx.waker()); + self.shared.get_mut().blocked_recv.register(cx.waker()); Poll::Pending } } @@ -168,7 +125,9 @@ impl Stream for Receiver { impl Drop for Receiver { fn drop(&mut self) { - self.close(); + let shared = self.shared.get_mut(); + shared.buffer.clear(); + shared.has_receiver = false; } } @@ -200,3 +159,38 @@ impl SendError { self.0 } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::lazy; + use futures::{Stream, StreamExt}; + + #[actix_rt::test] + async fn test_mpsc() { + let (tx, mut rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.next().await.unwrap(), "test"); + + let tx2 = tx.clone(); + tx2.send("test2").unwrap(); + assert_eq!(rx.next().await.unwrap(), "test2"); + + assert_eq!( + lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, + Poll::Pending + ); + drop(tx2); + assert_eq!( + lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, + Poll::Pending + ); + drop(tx); + assert_eq!(rx.next().await, None); + + let (tx, rx) = channel(); + tx.send("test").unwrap(); + drop(rx); + assert!(tx.send("test").is_err()); + } +} diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs index 9940cfa3..72aa7a84 100644 --- a/actix-utils/src/oneshot.rs +++ b/actix-utils/src/oneshot.rs @@ -1,69 +1,45 @@ -//! A one-shot, futures-aware channel -//! -//! This channel is similar to that in `sync::oneshot` but cannot be sent across -//! threads. - +//! A one-shot, futures-aware channel. use std::future::Future; use std::pin::Pin; -use std::rc::Rc; use std::task::{Context, Poll}; pub use futures::channel::oneshot::Canceled; -use crate::cell::{Cell, WeakCell}; +use crate::cell::Cell; use crate::task::LocalWaker; /// Creates a new futures-aware, one-shot channel. -/// -/// This function is the same as `sync::oneshot::channel` except that the -/// returned values cannot be sent across threads. pub fn channel() -> (Sender, Receiver) { let inner = Cell::new(Inner { value: None, rx_task: LocalWaker::new(), }); let tx = Sender { - inner: inner.downgrade(), - }; - let rx = Receiver { - state: State::Open(inner), + inner: inner.clone(), }; + let rx = Receiver { inner }; (tx, rx) } /// Represents the completion half of a oneshot through which the result of a /// computation is signaled. -/// -/// This is created by the `unsync::oneshot::channel` function and is equivalent -/// in functionality to `sync::oneshot::Sender` except that it cannot be sent -/// across threads. #[derive(Debug)] pub struct Sender { - inner: WeakCell>, + inner: Cell>, } /// A future representing the completion of a computation happening elsewhere in /// memory. -/// -/// This is created by the `unsync::oneshot::channel` function and is equivalent -/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent -/// across threads. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct Receiver { - state: State, + inner: Cell>, } // The channels do not ever project Pin to the inner T impl Unpin for Receiver {} impl Unpin for Sender {} -#[derive(Debug)] -enum State { - Open(Cell>), - Closed(Option), -} - #[derive(Debug)] struct Inner { value: Option, @@ -78,12 +54,12 @@ impl Sender { /// represents. /// /// If the value is successfully enqueued for the remote end to receive, - /// then `Ok(())` is returned. If the receiving end was deallocated before + /// then `Ok(())` is returned. If the receiving end was dropped before /// this function was called, however, then `Err` is returned with the value /// provided. - pub fn send(self, val: T) -> Result<(), T> { - if let Some(mut inner) = self.inner.upgrade() { - let inner = inner.get_mut(); + pub fn send(mut self, val: T) -> Result<(), T> { + if self.inner.strong_count() == 2 { + let inner = self.inner.get_mut(); inner.value = Some(val); inner.rx_task.wake(); Ok(()) @@ -91,47 +67,12 @@ impl Sender { Err(val) } } - - /// Tests to see whether this `Sender`'s corresponding `Receiver` - /// has gone away. - /// - /// This function can be used to learn about when the `Receiver` (consumer) - /// half has gone away and nothing will be able to receive a message sent - /// from `send`. - /// - /// Note that this function is intended to *not* be used in the context of a - /// future. If you're implementing a future you probably want to call the - /// `poll_cancel` function which will block the current task if the - /// cancellation hasn't happened yet. This can be useful when working on a - /// non-futures related thread, though, which would otherwise panic if - /// `poll_cancel` were called. - pub fn is_canceled(&self) -> bool { - self.inner.upgrade().is_none() - } } impl Drop for Sender { fn drop(&mut self) { - if let Some(inner) = self.inner.upgrade() { - inner.get_ref().rx_task.wake(); - }; - } -} - -impl Receiver { - /// Gracefully close this receiver, preventing sending any future messages. - /// - /// Any `send` operation which happens after this method returns is - /// guaranteed to fail. Once this method is called the normal `poll` method - /// can be used to determine whether a message was actually sent or not. If - /// `Canceled` is returned from `poll` then no message was sent. - pub fn close(&mut self) { - match self.state { - State::Open(ref mut inner) => { - let value = inner.get_mut().value.take(); - self.state = State::Closed(value); - } - State::Closed(_) => {} + if self.inner.strong_count() == 2 { + self.inner.get_ref().rx_task.wake(); }; } } @@ -142,33 +83,48 @@ impl Future for Receiver { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - let inner = match this.state { - State::Open(ref mut inner) => inner, - State::Closed(ref mut item) => match item.take() { - Some(item) => return Poll::Ready(Ok(item)), - None => return Poll::Ready(Err(Canceled)), - }, - }; - // If we've got a value, then skip the logic below as we're done. - if let Some(val) = inner.get_mut().value.take() { + if let Some(val) = this.inner.get_mut().value.take() { return Poll::Ready(Ok(val)); } - // If we can get mutable access, then the sender has gone away. We - // didn't see a value above, so we're canceled. Otherwise we park - // our task and wait for a value to come in. - if Rc::get_mut(&mut inner.inner).is_some() { + // Check if sender is dropped and return error if it is. + if this.inner.strong_count() == 1 { Poll::Ready(Err(Canceled)) } else { - inner.get_ref().rx_task.register(cx.waker()); + this.inner.get_ref().rx_task.register(cx.waker()); Poll::Pending } } } -impl Drop for Receiver { - fn drop(&mut self) { - self.close(); +#[cfg(test)] +mod tests { + use super::*; + use futures::future::lazy; + + #[actix_rt::test] + async fn test_oneshot() { + let (tx, rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.await.unwrap(), "test"); + + let (tx, rx) = channel(); + drop(rx); + assert!(tx.send("test").is_err()); + + let (tx, rx) = channel::<&'static str>(); + drop(tx); + assert!(rx.await.is_err()); + + let (tx, mut rx) = channel::<&'static str>(); + assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); + tx.send("test").unwrap(); + assert_eq!(rx.await.unwrap(), "test"); + + let (tx, mut rx) = channel::<&'static str>(); + assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending); + drop(tx); + assert!(rx.await.is_err()); } }