diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index a7871612..8d97b741 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,15 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `async fn mpsc::Receiver::recv`. [#286] +* `SendError` inner field is now public. [#286] +* Rename `Dispatcher::{get_sink => tx}`. [#286] +* Rename `Dispatcher::{get_ref => service}`. [#286] +* Rename `Dispatcher::{get_mut => service_mut}`. [#286] +* Rename `Dispatcher::{get_framed => framed}`. [#286] +* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286] + +[#286]: https://github.com/actix/actix-net/pull/286 ## 3.0.0-beta.2 - 2021-02-06 diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs index 1e55aa2c..94ac9971 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-utils/src/dispatcher.rs @@ -163,29 +163,28 @@ where } } - /// Get sink - pub fn get_sink(&self) -> mpsc::Sender, S::Error>> { + /// Get sender handle. + pub fn tx(&self) -> mpsc::Sender, S::Error>> { self.tx.clone() } /// Get reference to a service wrapped by `Dispatcher` instance. - pub fn get_ref(&self) -> &S { + pub fn service(&self) -> &S { &self.service } /// Get mutable reference to a service wrapped by `Dispatcher` instance. - pub fn get_mut(&mut self) -> &mut S { + pub fn service_mut(&mut self) -> &mut S { &mut self.service } - /// Get reference to a framed instance wrapped by `Dispatcher` - /// instance. - pub fn get_framed(&self) -> &Framed { + /// Get reference to a framed instance wrapped by `Dispatcher` instance. + pub fn framed(&self) -> &Framed { &self.framed } /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. - pub fn get_framed_mut(&mut self) -> &mut Framed { + pub fn framed_mut(&mut self) -> &mut Framed { &mut self.framed } @@ -268,7 +267,7 @@ where if !this.framed.is_write_buf_empty() { match this.framed.flush(cx) { Poll::Pending => break, - Poll::Ready(Ok(_)) => (), + Poll::Ready(Ok(_)) => {} Poll::Ready(Err(err)) => { debug!("Error sending data: {:?}", err); *this.state = State::FramedError(DispatcherError::Encoder(err)); @@ -318,14 +317,13 @@ where } State::FlushAndStop => { if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Ready(Err(err)) => { + this.framed.flush(cx).map(|res| { + if let Err(err) = res { debug!("Error sending data: {:?}", err); - Poll::Ready(Ok(())) } - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - } + + Ok(()) + }) } else { Poll::Ready(Ok(())) } diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 5c10bac6..6658cba8 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -8,5 +8,8 @@ pub mod counter; pub mod dispatcher; pub mod mpsc; +mod poll_fn; pub mod task; pub mod timeout; + +use self::poll_fn::poll_fn; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index 2f2b3f04..9c7a5a0e 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,31 +1,35 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use core::any::Any; -use core::cell::RefCell; -use core::fmt; -use core::pin::Pin; -use core::task::{Context, Poll}; +use core::{ + cell::RefCell, + fmt, + pin::Pin, + task::{Context, Poll}, +}; -use std::collections::VecDeque; -use std::error::Error; -use std::rc::Rc; +use std::{collections::VecDeque, error::Error, rc::Rc}; use futures_core::stream::Stream; use futures_sink::Sink; -use crate::task::LocalWaker; +use crate::{poll_fn, task::LocalWaker}; /// Creates a unbounded in-memory channel with buffered storage. +/// +/// [Sender]s and [Receiver]s are `!Send`. pub fn channel() -> (Sender, Receiver) { let shared = Rc::new(RefCell::new(Shared { has_receiver: true, buffer: VecDeque::new(), blocked_recv: LocalWaker::new(), })); + let sender = Sender { shared: shared.clone(), }; + let receiver = Receiver { shared }; + (sender, receiver) } @@ -50,18 +54,22 @@ impl Sender { /// Sends the provided message along this channel. pub fn send(&self, item: T) -> Result<(), SendError> { let mut shared = self.shared.borrow_mut(); + if !shared.has_receiver { - return Err(SendError(item)); // receiver was dropped + // receiver was dropped + return Err(SendError(item)); }; + shared.buffer.push_back(item); shared.blocked_recv.wake(); + Ok(()) } - /// Closes the sender half + /// Closes the sender half. /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. + /// This prevents any further messages from being sent on the channel, by any sender, while + /// still enabling the receiver to drain messages that are already buffered. pub fn close(&mut self) { self.shared.borrow_mut().has_receiver = false; } @@ -110,14 +118,24 @@ impl Drop for Sender { /// The receiving end of a channel which implements the `Stream` trait. /// -/// This is created by the `channel` function. +/// This is created by the [`channel`] function. #[derive(Debug)] pub struct Receiver { shared: Rc>>, } impl Receiver { - /// Create Sender + /// Receive the next value. + /// + /// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or + /// when all senders have been dropped and, therefore, no more values can ever be sent though + /// this channel. + pub async fn recv(&mut self) -> Option { + let mut this = Pin::new(self); + poll_fn(|cx| this.as_mut().poll_next(cx)).await + } + + /// Create an associated [Sender]. pub fn sender(&self) -> Sender { Sender { shared: self.shared.clone(), @@ -132,11 +150,13 @@ impl Stream for Receiver { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut shared = self.shared.borrow_mut(); + if Rc::strong_count(&self.shared) == 1 { - // All senders have been dropped, so drain the buffer and end the - // stream. - Poll::Ready(shared.buffer.pop_front()) - } else if let Some(msg) = shared.buffer.pop_front() { + // All senders have been dropped, so drain the buffer and end the stream. + return Poll::Ready(shared.buffer.pop_front()); + } + + if let Some(msg) = shared.buffer.pop_front() { Poll::Ready(Some(msg)) } else { shared.blocked_recv.register(cx.waker()); @@ -153,9 +173,15 @@ impl Drop for Receiver { } } -/// Error type for sending, used when the receiving end of a channel is -/// dropped -pub struct SendError(T); +/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed. +pub struct SendError(pub T); + +impl SendError { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} impl fmt::Debug for SendError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -169,18 +195,7 @@ impl fmt::Display for SendError { } } -impl Error for SendError { - fn description(&self) -> &str { - "send failed because receiver is gone" - } -} - -impl SendError { - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - self.0 - } -} +impl Error for SendError {} #[cfg(test)] mod tests { @@ -221,4 +236,18 @@ mod tests { assert!(tx.send("test").is_err()); assert!(tx2.send("test").is_err()); } + + #[actix_rt::test] + async fn test_recv() { + let (tx, mut rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.recv().await.unwrap(), "test"); + drop(tx); + + let (tx, mut rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.recv().await.unwrap(), "test"); + drop(tx); + assert!(rx.recv().await.is_none()); + } } diff --git a/actix-utils/src/poll_fn.rs b/actix-utils/src/poll_fn.rs new file mode 100644 index 00000000..2180f4a4 --- /dev/null +++ b/actix-utils/src/poll_fn.rs @@ -0,0 +1,65 @@ +//! Simple "poll function" future and factory. + +use core::{ + fmt, + future::Future, + task::{self, Poll}, +}; +use std::pin::Pin; + +/// Create a future driven by the provided function that receives a task context. +pub(crate) fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut task::Context<'_>) -> Poll, +{ + PollFn { f } +} + +/// A Future driven by the inner function. +pub(crate) struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl fmt::Debug for PollFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl Future for PollFn +where + F: FnMut(&mut task::Context<'_>) -> task::Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + (self.f)(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[actix_rt::test] + async fn test_poll_fn() { + let res = poll_fn(|_| Poll::Ready(42)).await; + assert_eq!(res, 42); + + let mut i = 5; + let res = poll_fn(|cx| { + i -= 1; + + if i > 0 { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(42) + } + }) + .await; + assert_eq!(res, 42); + } +} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs index d0793488..507bfc14 100644 --- a/actix-utils/src/task.rs +++ b/actix-utils/src/task.rs @@ -9,11 +9,14 @@ use core::{cell::Cell, fmt, marker::PhantomData, task::Waker}; /// logical task. /// /// Consumers should call [`register`] before checking the result of a computation and producers -/// should call `wake` after producing the computation (this differs from the usual `thread::park` +/// should call [`wake`] after producing the computation (this differs from the usual `thread::park` /// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in /// a no-op. /// /// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`]. +/// +/// [`register`]: LocalWaker::register +/// [`wake`]: LocalWaker::wake #[derive(Default)] pub struct LocalWaker { pub(crate) waker: Cell>,