1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-12-04 09:21:55 +01:00

Simplify oneshot and mpsc implementations

This commit is contained in:
Nikolay Kim 2019-12-11 11:28:09 +06:00
parent e315cf2893
commit 2e5e69c9ba
12 changed files with 129 additions and 194 deletions

View File

@ -36,7 +36,7 @@ uri = ["http"]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3" actix-utils = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"
derive_more = "0.99.2" derive_more = "0.99.2"
either = "1.5.2" either = "1.5.2"
futures = "0.3.1" futures = "0.3.1"

View File

@ -21,7 +21,7 @@ path = "src/lib.rs"
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.2" actix-utils = "1.0.0-alpha.2"
actix-rt = "1.0.0-alpha.2" actix-rt = "1.0.0"
bytes = "0.5" bytes = "0.5"
either = "1.5.2" either = "1.5.2"
futures = "0.3.1" futures = "0.3.1"

View File

@ -18,4 +18,4 @@ quote = "^1"
syn = { version = "^1", features = ["full"] } syn = { version = "^1", features = ["full"] }
[dev-dependencies] [dev-dependencies]
actix-rt = { version = "1.0.0-alpha.3" } actix-rt = { version = "1.0.0" }

View File

@ -22,7 +22,7 @@ default = []
[dependencies] [dependencies]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3" actix-utils = "1.0.0-alpha.3"

View File

@ -26,4 +26,4 @@ futures-util = "0.3.1"
pin-project = "0.4.6" pin-project = "0.4.6"
[dev-dependencies] [dev-dependencies]
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"

View File

@ -17,7 +17,7 @@ name = "actix_testing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"
actix-macros = "0.1.0" actix-macros = "0.1.0"
actix-server = "1.0.0-alpha.3" actix-server = "1.0.0-alpha.3"
actix-service = "1.0.0" actix-service = "1.0.0"

View File

@ -35,7 +35,7 @@ nativetls = ["native-tls", "tokio-tls"]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
actix-utils = "1.0.0-alpha.3" actix-utils = "1.0.0-alpha.3"
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"
derive_more = "0.99.2" derive_more = "0.99.2"
either = "1.5.2" either = "1.5.2"
futures = "0.3.1" futures = "0.3.1"

View File

@ -1,5 +1,9 @@
# Changes # Changes
## [1.0.0] - 2019-12-11
* Simplify oneshot and mpsc implementations
## [1.0.0-alpha.3] - 2019-12-07 ## [1.0.0-alpha.3] - 2019-12-07
* Migrate to tokio 0.2 * Migrate to tokio 0.2

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "1.0.0-alpha.3" version = "1.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -9,7 +9,6 @@ repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-utils/" documentation = "https://docs.rs/actix-utils/"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018" edition = "2018"
workspace = ".." workspace = ".."
@ -19,7 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "1.0.0" actix-service = "1.0.0"
actix-rt = "1.0.0-alpha.3" actix-rt = "1.0.0"
actix-codec = "0.2.0" actix-codec = "0.2.0"
bytes = "0.5.2" bytes = "0.5.2"
either = "1.5.2" either = "1.5.2"

View File

@ -2,16 +2,12 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::fmt; use std::fmt;
use std::rc::{Rc, Weak}; use std::rc::Rc;
pub(crate) struct Cell<T> { pub(crate) struct Cell<T> {
pub(crate) inner: Rc<UnsafeCell<T>>, pub(crate) inner: Rc<UnsafeCell<T>>,
} }
pub(crate) struct WeakCell<T> {
inner: Weak<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> { impl<T> Clone for Cell<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
@ -27,39 +23,25 @@ impl<T: fmt::Debug> fmt::Debug for Cell<T> {
} }
impl<T> Cell<T> { impl<T> Cell<T> {
pub fn new(inner: T) -> Self { pub(crate) fn new(inner: T) -> Self {
Self { Self {
inner: Rc::new(UnsafeCell::new(inner)), inner: Rc::new(UnsafeCell::new(inner)),
} }
} }
pub fn downgrade(&self) -> WeakCell<T> { pub(crate) fn strong_count(&self) -> usize {
WeakCell { Rc::strong_count(&self.inner)
inner: Rc::downgrade(&self.inner),
}
} }
pub fn get_ref(&self) -> &T { pub(crate) fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() } unsafe { &*self.inner.as_ref().get() }
} }
pub fn get_mut(&mut self) -> &mut T { pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() } unsafe { &mut *self.inner.as_ref().get() }
} }
}
impl<T> WeakCell<T> { pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {
pub fn upgrade(&self) -> Option<Cell<T>> { &mut *self.inner.as_ref().get()
if let Some(inner) = self.inner.upgrade() {
Some(Cell { inner })
} else {
None
}
}
}
impl<T: fmt::Debug> fmt::Debug for WeakCell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
} }
} }

View File

@ -1,34 +1,27 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue with back //! A multi-producer, single-consumer, futures-aware, FIFO queue.
//! pressure, for use communicating between tasks on the same thread.
//!
//! These queues are the same as those in `futures::sync`, except they're not
//! intended to be sent across threads.
use std::any::Any; use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::error::Error; use std::error::Error;
use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, mem};
use futures::{Sink, Stream}; use futures::{Sink, Stream};
use crate::cell::Cell;
use crate::task::LocalWaker; use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage. /// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Rc::new(RefCell::new(Shared { let shared = Cell::new(Shared {
has_receiver: true,
buffer: VecDeque::new(), buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(), blocked_recv: LocalWaker::new(),
})); });
let sender = Sender { let sender = Sender {
shared: Rc::downgrade(&shared), shared: shared.clone(),
};
let receiver = Receiver {
state: State::Open(shared),
}; };
let receiver = Receiver { shared };
(sender, receiver) (sender, receiver)
} }
@ -36,6 +29,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
struct Shared<T> { struct Shared<T> {
buffer: VecDeque<T>, buffer: VecDeque<T>,
blocked_recv: LocalWaker, blocked_recv: LocalWaker,
has_receiver: bool,
} }
/// The transmission end of a channel. /// The transmission end of a channel.
@ -43,18 +37,18 @@ struct Shared<T> {
/// This is created by the `channel` function. /// This is created by the `channel` function.
#[derive(Debug)] #[derive(Debug)]
pub struct Sender<T> { pub struct Sender<T> {
shared: Weak<RefCell<Shared<T>>>, shared: Cell<Shared<T>>,
} }
impl<T> Unpin for Sender<T> {}
impl<T> Sender<T> { impl<T> Sender<T> {
/// Sends the provided message along this channel. /// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> { pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let shared = match self.shared.upgrade() { let shared = unsafe { self.shared.get_mut_unsafe() };
Some(shared) => shared, if !shared.has_receiver {
None => return Err(SendError(item)), // receiver was dropped return Err(SendError(item)); // receiver was dropped
}; };
let mut shared = shared.borrow_mut();
shared.buffer.push_back(item); shared.buffer.push_back(item);
shared.blocked_recv.wake(); shared.blocked_recv.wake();
Ok(()) Ok(())
@ -91,17 +85,13 @@ impl<T> Sink<T> for Sender<T> {
impl<T> Drop for Sender<T> { impl<T> Drop for Sender<T> {
fn drop(&mut self) { fn drop(&mut self) {
let shared = match self.shared.upgrade() { let count = self.shared.strong_count();
Some(shared) => shared, let shared = self.shared.get_mut();
None => return,
}; // check is last sender is about to drop
// The number of existing `Weak` indicates if we are possibly the last if shared.has_receiver && count == 2 {
// `Sender`. If we are the last, we possibly must notify a blocked
// `Receiver`. `self.shared` is always one of the `Weak` to this shared
// data. Therefore the smallest possible Rc::weak_count(&shared) is 1.
if Rc::weak_count(&shared) == 1 {
// Wake up receiver as its stream has ended // Wake up receiver as its stream has ended
shared.borrow_mut().blocked_recv.wake(); shared.blocked_recv.wake();
} }
} }
} }
@ -111,56 +101,23 @@ impl<T> Drop for Sender<T> {
/// This is created by the `channel` function. /// This is created by the `channel` function.
#[derive(Debug)] #[derive(Debug)]
pub struct Receiver<T> { pub struct Receiver<T> {
state: State<T>, shared: Cell<Shared<T>>,
} }
impl<T> Unpin for Receiver<T> {} impl<T> Unpin for Receiver<T> {}
/// Possible states of a receiver. We're either Open (can receive more messages)
/// or we're closed with a list of messages we have left to receive.
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Shared<T>>>),
Closed(VecDeque<T>),
}
impl<T> Receiver<T> {
/// Closes the receiving half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
let items = match self.state {
State::Open(ref state) => {
let mut state = state.borrow_mut();
mem::replace(&mut state.buffer, VecDeque::new())
}
State::Closed(_) => return,
};
self.state = State::Closed(items);
}
}
impl<T> Stream for Receiver<T> { impl<T> Stream for Receiver<T> {
type Item = T; type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = match self.state { if self.shared.strong_count() == 1 {
State::Open(ref mut me) => me,
State::Closed(ref mut items) => return Poll::Ready(items.pop_front()),
};
if let Some(shared) = Rc::get_mut(me) {
// All senders have been dropped, so drain the buffer and end the // All senders have been dropped, so drain the buffer and end the
// stream. // stream.
return Poll::Ready(shared.borrow_mut().buffer.pop_front()); Poll::Ready(self.shared.get_mut().buffer.pop_front())
} } else if let Some(msg) = self.shared.get_mut().buffer.pop_front() {
let mut shared = me.borrow_mut();
if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg)) Poll::Ready(Some(msg))
} else { } else {
shared.blocked_recv.register(cx.waker()); self.shared.get_mut().blocked_recv.register(cx.waker());
Poll::Pending Poll::Pending
} }
} }
@ -168,7 +125,9 @@ impl<T> Stream for Receiver<T> {
impl<T> Drop for Receiver<T> { impl<T> Drop for Receiver<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.close(); let shared = self.shared.get_mut();
shared.buffer.clear();
shared.has_receiver = false;
} }
} }
@ -200,3 +159,38 @@ impl<T> SendError<T> {
self.0 self.0
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use futures::{Stream, StreamExt};
#[actix_rt::test]
async fn test_mpsc() {
let (tx, mut rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.next().await.unwrap(), "test");
let tx2 = tx.clone();
tx2.send("test2").unwrap();
assert_eq!(rx.next().await.unwrap(), "test2");
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx2);
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx);
assert_eq!(rx.next().await, None);
let (tx, rx) = channel();
tx.send("test").unwrap();
drop(rx);
assert!(tx.send("test").is_err());
}
}

View File

@ -1,69 +1,45 @@
//! A one-shot, futures-aware channel //! A one-shot, futures-aware channel.
//!
//! This channel is similar to that in `sync::oneshot` but cannot be sent across
//! threads.
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub use futures::channel::oneshot::Canceled; pub use futures::channel::oneshot::Canceled;
use crate::cell::{Cell, WeakCell}; use crate::cell::Cell;
use crate::task::LocalWaker; use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel. /// Creates a new futures-aware, one-shot channel.
///
/// This function is the same as `sync::oneshot::channel` except that the
/// returned values cannot be sent across threads.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Cell::new(Inner { let inner = Cell::new(Inner {
value: None, value: None,
rx_task: LocalWaker::new(), rx_task: LocalWaker::new(),
}); });
let tx = Sender { let tx = Sender {
inner: inner.downgrade(), inner: inner.clone(),
};
let rx = Receiver {
state: State::Open(inner),
}; };
let rx = Receiver { inner };
(tx, rx) (tx, rx)
} }
/// Represents the completion half of a oneshot through which the result of a /// Represents the completion half of a oneshot through which the result of a
/// computation is signaled. /// computation is signaled.
///
/// This is created by the `unsync::oneshot::channel` function and is equivalent
/// in functionality to `sync::oneshot::Sender` except that it cannot be sent
/// across threads.
#[derive(Debug)] #[derive(Debug)]
pub struct Sender<T> { pub struct Sender<T> {
inner: WeakCell<Inner<T>>, inner: Cell<Inner<T>>,
} }
/// A future representing the completion of a computation happening elsewhere in /// A future representing the completion of a computation happening elsewhere in
/// memory. /// memory.
///
/// This is created by the `unsync::oneshot::channel` function and is equivalent
/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent
/// across threads.
#[derive(Debug)] #[derive(Debug)]
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> { pub struct Receiver<T> {
state: State<T>, inner: Cell<Inner<T>>,
} }
// The channels do not ever project Pin to the inner T // The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {} impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {} impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
enum State<T> {
Open(Cell<Inner<T>>),
Closed(Option<T>),
}
#[derive(Debug)] #[derive(Debug)]
struct Inner<T> { struct Inner<T> {
value: Option<T>, value: Option<T>,
@ -78,12 +54,12 @@ impl<T> Sender<T> {
/// represents. /// represents.
/// ///
/// If the value is successfully enqueued for the remote end to receive, /// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was deallocated before /// then `Ok(())` is returned. If the receiving end was dropped before
/// this function was called, however, then `Err` is returned with the value /// this function was called, however, then `Err` is returned with the value
/// provided. /// provided.
pub fn send(self, val: T) -> Result<(), T> { pub fn send(mut self, val: T) -> Result<(), T> {
if let Some(mut inner) = self.inner.upgrade() { if self.inner.strong_count() == 2 {
let inner = inner.get_mut(); let inner = self.inner.get_mut();
inner.value = Some(val); inner.value = Some(val);
inner.rx_task.wake(); inner.rx_task.wake();
Ok(()) Ok(())
@ -91,47 +67,12 @@ impl<T> Sender<T> {
Err(val) Err(val)
} }
} }
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
///
/// This function can be used to learn about when the `Receiver` (consumer)
/// half has gone away and nothing will be able to receive a message sent
/// from `send`.
///
/// Note that this function is intended to *not* be used in the context of a
/// future. If you're implementing a future you probably want to call the
/// `poll_cancel` function which will block the current task if the
/// cancellation hasn't happened yet. This can be useful when working on a
/// non-futures related thread, though, which would otherwise panic if
/// `poll_cancel` were called.
pub fn is_canceled(&self) -> bool {
self.inner.upgrade().is_none()
}
} }
impl<T> Drop for Sender<T> { impl<T> Drop for Sender<T> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(inner) = self.inner.upgrade() { if self.inner.strong_count() == 2 {
inner.get_ref().rx_task.wake(); self.inner.get_ref().rx_task.wake();
};
}
}
impl<T> Receiver<T> {
/// Gracefully close this receiver, preventing sending any future messages.
///
/// Any `send` operation which happens after this method returns is
/// guaranteed to fail. Once this method is called the normal `poll` method
/// can be used to determine whether a message was actually sent or not. If
/// `Canceled` is returned from `poll` then no message was sent.
pub fn close(&mut self) {
match self.state {
State::Open(ref mut inner) => {
let value = inner.get_mut().value.take();
self.state = State::Closed(value);
}
State::Closed(_) => {}
}; };
} }
} }
@ -142,33 +83,48 @@ impl<T> Future for Receiver<T> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();
let inner = match this.state {
State::Open(ref mut inner) => inner,
State::Closed(ref mut item) => match item.take() {
Some(item) => return Poll::Ready(Ok(item)),
None => return Poll::Ready(Err(Canceled)),
},
};
// If we've got a value, then skip the logic below as we're done. // If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.get_mut().value.take() { if let Some(val) = this.inner.get_mut().value.take() {
return Poll::Ready(Ok(val)); return Poll::Ready(Ok(val));
} }
// If we can get mutable access, then the sender has gone away. We // Check if sender is dropped and return error if it is.
// didn't see a value above, so we're canceled. Otherwise we park if this.inner.strong_count() == 1 {
// our task and wait for a value to come in.
if Rc::get_mut(&mut inner.inner).is_some() {
Poll::Ready(Err(Canceled)) Poll::Ready(Err(Canceled))
} else { } else {
inner.get_ref().rx_task.register(cx.waker()); this.inner.get_ref().rx_task.register(cx.waker());
Poll::Pending Poll::Pending
} }
} }
} }
impl<T> Drop for Receiver<T> { #[cfg(test)]
fn drop(&mut self) { mod tests {
self.close(); use super::*;
use futures::future::lazy;
#[actix_rt::test]
async fn test_oneshot() {
let (tx, rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, rx) = channel();
drop(rx);
assert!(tx.send("test").is_err());
let (tx, rx) = channel::<&'static str>();
drop(tx);
assert!(rx.await.is_err());
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
tx.send("test").unwrap();
assert_eq!(rx.await.unwrap(), "test");
let (tx, mut rx) = channel::<&'static str>();
assert_eq!(lazy(|cx| Pin::new(&mut rx).poll(cx)).await, Poll::Pending);
drop(tx);
assert!(rx.await.is_err());
} }
} }