1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 22:51:07 +01:00

refactor dispatcher / add Receiver::recv (#286)

This commit is contained in:
Rob Ede 2021-02-28 21:11:16 +00:00 committed by GitHub
parent 493a1a32c0
commit 382830a37e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 157 additions and 50 deletions

View File

@ -1,6 +1,15 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Add `async fn mpsc::Receiver::recv`. [#286]
* `SendError` inner field is now public. [#286]
* Rename `Dispatcher::{get_sink => tx}`. [#286]
* Rename `Dispatcher::{get_ref => service}`. [#286]
* Rename `Dispatcher::{get_mut => service_mut}`. [#286]
* Rename `Dispatcher::{get_framed => framed}`. [#286]
* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286]
[#286]: https://github.com/actix/actix-net/pull/286
## 3.0.0-beta.2 - 2021-02-06 ## 3.0.0-beta.2 - 2021-02-06

View File

@ -163,29 +163,28 @@ where
} }
} }
/// Get sink /// Get sender handle.
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> { pub fn tx(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone() self.tx.clone()
} }
/// Get reference to a service wrapped by `Dispatcher` instance. /// Get reference to a service wrapped by `Dispatcher` instance.
pub fn get_ref(&self) -> &S { pub fn service(&self) -> &S {
&self.service &self.service
} }
/// Get mutable reference to a service wrapped by `Dispatcher` instance. /// Get mutable reference to a service wrapped by `Dispatcher` instance.
pub fn get_mut(&mut self) -> &mut S { pub fn service_mut(&mut self) -> &mut S {
&mut self.service &mut self.service
} }
/// Get reference to a framed instance wrapped by `Dispatcher` /// Get reference to a framed instance wrapped by `Dispatcher` instance.
/// instance. pub fn framed(&self) -> &Framed<T, U> {
pub fn get_framed(&self) -> &Framed<T, U> {
&self.framed &self.framed
} }
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> { pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
&mut self.framed &mut self.framed
} }
@ -268,7 +267,7 @@ where
if !this.framed.is_write_buf_empty() { if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) { match this.framed.flush(cx) {
Poll::Pending => break, Poll::Pending => break,
Poll::Ready(Ok(_)) => (), Poll::Ready(Ok(_)) => {}
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err); debug!("Error sending data: {:?}", err);
*this.state = State::FramedError(DispatcherError::Encoder(err)); *this.state = State::FramedError(DispatcherError::Encoder(err));
@ -318,14 +317,13 @@ where
} }
State::FlushAndStop => { State::FlushAndStop => {
if !this.framed.is_write_buf_empty() { if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) { this.framed.flush(cx).map(|res| {
Poll::Ready(Err(err)) => { if let Err(err) = res {
debug!("Error sending data: {:?}", err); debug!("Error sending data: {:?}", err);
Poll::Ready(Ok(()))
} }
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), Ok(())
} })
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }

View File

@ -8,5 +8,8 @@
pub mod counter; pub mod counter;
pub mod dispatcher; pub mod dispatcher;
pub mod mpsc; pub mod mpsc;
mod poll_fn;
pub mod task; pub mod task;
pub mod timeout; pub mod timeout;
use self::poll_fn::poll_fn;

View File

@ -1,31 +1,35 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue. //! A multi-producer, single-consumer, futures-aware, FIFO queue.
use core::any::Any; use core::{
use core::cell::RefCell; cell::RefCell,
use core::fmt; fmt,
use core::pin::Pin; pin::Pin,
use core::task::{Context, Poll}; task::{Context, Poll},
};
use std::collections::VecDeque; use std::{collections::VecDeque, error::Error, rc::Rc};
use std::error::Error;
use std::rc::Rc;
use futures_core::stream::Stream; use futures_core::stream::Stream;
use futures_sink::Sink; use futures_sink::Sink;
use crate::task::LocalWaker; use crate::{poll_fn, task::LocalWaker};
/// Creates a unbounded in-memory channel with buffered storage. /// Creates a unbounded in-memory channel with buffered storage.
///
/// [Sender]s and [Receiver]s are `!Send`.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Rc::new(RefCell::new(Shared { let shared = Rc::new(RefCell::new(Shared {
has_receiver: true, has_receiver: true,
buffer: VecDeque::new(), buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(), blocked_recv: LocalWaker::new(),
})); }));
let sender = Sender { let sender = Sender {
shared: shared.clone(), shared: shared.clone(),
}; };
let receiver = Receiver { shared }; let receiver = Receiver { shared };
(sender, receiver) (sender, receiver)
} }
@ -50,18 +54,22 @@ impl<T> Sender<T> {
/// Sends the provided message along this channel. /// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> { pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let mut shared = self.shared.borrow_mut(); let mut shared = self.shared.borrow_mut();
if !shared.has_receiver { if !shared.has_receiver {
return Err(SendError(item)); // receiver was dropped // receiver was dropped
return Err(SendError(item));
}; };
shared.buffer.push_back(item); shared.buffer.push_back(item);
shared.blocked_recv.wake(); shared.blocked_recv.wake();
Ok(()) Ok(())
} }
/// Closes the sender half /// Closes the sender half.
/// ///
/// This prevents any further messages from being sent on the channel while /// This prevents any further messages from being sent on the channel, by any sender, while
/// still enabling the receiver to drain messages that are buffered. /// still enabling the receiver to drain messages that are already buffered.
pub fn close(&mut self) { pub fn close(&mut self) {
self.shared.borrow_mut().has_receiver = false; self.shared.borrow_mut().has_receiver = false;
} }
@ -110,14 +118,24 @@ impl<T> Drop for Sender<T> {
/// The receiving end of a channel which implements the `Stream` trait. /// The receiving end of a channel which implements the `Stream` trait.
/// ///
/// This is created by the `channel` function. /// This is created by the [`channel`] function.
#[derive(Debug)] #[derive(Debug)]
pub struct Receiver<T> { pub struct Receiver<T> {
shared: Rc<RefCell<Shared<T>>>, shared: Rc<RefCell<Shared<T>>>,
} }
impl<T> Receiver<T> { impl<T> Receiver<T> {
/// Create Sender /// Receive the next value.
///
/// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or
/// when all senders have been dropped and, therefore, no more values can ever be sent though
/// this channel.
pub async fn recv(&mut self) -> Option<T> {
let mut this = Pin::new(self);
poll_fn(|cx| this.as_mut().poll_next(cx)).await
}
/// Create an associated [Sender].
pub fn sender(&self) -> Sender<T> { pub fn sender(&self) -> Sender<T> {
Sender { Sender {
shared: self.shared.clone(), shared: self.shared.clone(),
@ -132,11 +150,13 @@ impl<T> Stream for Receiver<T> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared = self.shared.borrow_mut(); let mut shared = self.shared.borrow_mut();
if Rc::strong_count(&self.shared) == 1 { if Rc::strong_count(&self.shared) == 1 {
// All senders have been dropped, so drain the buffer and end the // All senders have been dropped, so drain the buffer and end the stream.
// stream. return Poll::Ready(shared.buffer.pop_front());
Poll::Ready(shared.buffer.pop_front()) }
} else if let Some(msg) = shared.buffer.pop_front() {
if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg)) Poll::Ready(Some(msg))
} else { } else {
shared.blocked_recv.register(cx.waker()); shared.blocked_recv.register(cx.waker());
@ -153,9 +173,15 @@ impl<T> Drop for Receiver<T> {
} }
} }
/// Error type for sending, used when the receiving end of a channel is /// Error returned when attempting to send after the channels' [Receiver] is dropped or closed.
/// dropped pub struct SendError<T>(pub T);
pub struct SendError<T>(T);
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> fmt::Debug for SendError<T> { impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -169,18 +195,7 @@ impl<T> fmt::Display for SendError<T> {
} }
} }
impl<T: Any> Error for SendError<T> { impl<T> Error for SendError<T> {}
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -221,4 +236,18 @@ mod tests {
assert!(tx.send("test").is_err()); assert!(tx.send("test").is_err());
assert!(tx2.send("test").is_err()); assert!(tx2.send("test").is_err());
} }
#[actix_rt::test]
async fn test_recv() {
let (tx, mut rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.recv().await.unwrap(), "test");
drop(tx);
let (tx, mut rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.recv().await.unwrap(), "test");
drop(tx);
assert!(rx.recv().await.is_none());
}
} }

View File

@ -0,0 +1,65 @@
//! Simple "poll function" future and factory.
use core::{
fmt,
future::Future,
task::{self, Poll},
};
use std::pin::Pin;
/// Create a future driven by the provided function that receives a task context.
pub(crate) fn poll_fn<F, T>(f: F) -> PollFn<F>
where
F: FnMut(&mut task::Context<'_>) -> Poll<T>,
{
PollFn { f }
}
/// A Future driven by the inner function.
pub(crate) struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}
impl<F, T> Future for PollFn<F>
where
F: FnMut(&mut task::Context<'_>) -> task::Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
(self.f)(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[actix_rt::test]
async fn test_poll_fn() {
let res = poll_fn(|_| Poll::Ready(42)).await;
assert_eq!(res, 42);
let mut i = 5;
let res = poll_fn(|cx| {
i -= 1;
if i > 0 {
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(42)
}
})
.await;
assert_eq!(res, 42);
}
}

View File

@ -9,11 +9,14 @@ use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
/// logical task. /// logical task.
/// ///
/// Consumers should call [`register`] before checking the result of a computation and producers /// Consumers should call [`register`] before checking the result of a computation and producers
/// should call `wake` after producing the computation (this differs from the usual `thread::park` /// should call [`wake`] after producing the computation (this differs from the usual `thread::park`
/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in /// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in
/// a no-op. /// a no-op.
/// ///
/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`]. /// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`].
///
/// [`register`]: LocalWaker::register
/// [`wake`]: LocalWaker::wake
#[derive(Default)] #[derive(Default)]
pub struct LocalWaker { pub struct LocalWaker {
pub(crate) waker: Cell<Option<Waker>>, pub(crate) waker: Cell<Option<Waker>>,