1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 11:51:50 +01:00

remove mpsc impl from -utils

This commit is contained in:
Rob Ede 2021-02-24 02:11:08 +00:00
parent 841c611233
commit 564acfbf3a
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 57 additions and 300 deletions

View File

@ -235,7 +235,7 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer to underlying I/O stream.
pub fn flush<I>(
pub fn poll_flush<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
@ -271,7 +271,7 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close<I>(
pub fn poll_close<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), U::Error>>
@ -319,11 +319,11 @@ where
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
self.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
self.poll_close(cx)
}
}

View File

@ -24,6 +24,7 @@ futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
pin-project-lite = "0.2.0"
tokio = { version = "1", features = ["sync"] }
[dev-dependencies]
actix-rt = "2.0.0"

View File

@ -1,21 +1,20 @@
//! Framed dispatcher service and related utilities.
#![allow(type_alias_bounds)]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::{fmt, mem};
use core::{
fmt,
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_core::stream::Stream;
use log::debug;
use pin_project_lite::pin_project;
use tokio::sync::mpsc;
use crate::mpsc;
/// Framed transport errors
/// Framed transport errors.
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder<I>>::Error),
@ -64,8 +63,7 @@ pub enum Message<T> {
}
pin_project! {
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
/// Dispatcher is a future that reads frames from Framed object and passes them to the service.
pub struct Dispatcher<S, T, U, I>
where
S: Service<<U as Decoder>::Item, Response = I>,
@ -82,8 +80,8 @@ pin_project! {
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
rx: mpsc::UnboundedReceiver<Result<Message<I>, S::Error>>,
tx: mpsc::UnboundedSender<Result<Message<I>, S::Error>>,
}
}
@ -134,26 +132,7 @@ where
where
F: IntoService<S, <U as Decoder>::Item>,
{
let (tx, rx) = mpsc::channel();
Dispatcher {
framed,
rx,
tx,
service: service.into_service(),
state: State::Processing,
}
}
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
pub fn with_rx<F>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let tx = rx.sender();
let (tx, rx) = mpsc::unbounded_channel();
Dispatcher {
framed,
rx,
@ -164,28 +143,28 @@ where
}
/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
pub fn tx(&self) -> mpsc::UnboundedSender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
/// Get reference to a service wrapped by `Dispatcher` instance.
pub fn get_ref(&self) -> &S {
pub fn service(&self) -> &S {
&self.service
}
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
pub fn get_mut(&mut self) -> &mut S {
pub fn service_mut(&mut self) -> &mut S {
&mut self.service
}
/// Get reference to a framed instance wrapped by `Dispatcher`
/// instance.
pub fn get_framed(&self) -> &Framed<T, U> {
pub fn framed(&self) -> &Framed<T, U> {
&self.framed
}
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> {
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
&mut self.framed
}
@ -246,7 +225,7 @@ where
loop {
let mut this = self.as_mut().project();
while !this.framed.is_write_buf_full() {
match Pin::new(&mut this.rx).poll_next(cx) {
match this.rx.poll_recv(cx) {
Poll::Ready(Some(Ok(Message::Item(msg)))) => {
if let Err(err) = this.framed.as_mut().write(msg) {
*this.state = State::FramedError(DispatcherError::Encoder(err));
@ -266,7 +245,7 @@ where
}
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
match this.framed.poll_flush(cx) {
Poll::Pending => break,
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(err)) => {
@ -298,41 +277,43 @@ where
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let this = self.as_mut().project();
let this = self.as_mut().project();
return match this.state {
State::Processing => {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
continue;
} else {
Poll::Pending
}
match this.state {
State::Processing => {
if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) {
self.poll(cx)
} else {
Poll::Pending
}
State::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
return Poll::Pending;
}
Poll::Ready(Err(this.state.take_error()))
}
State::Error(_) => {
// flush write buffer
if !this.framed.is_write_buf_empty() && this.framed.poll_flush(cx).is_pending()
{
return Poll::Pending;
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(this.state.take_error()))
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
this.framed.poll_flush(cx).map(|res| {
if let Err(err) = res {
debug!("Error sending data: {:?}", err);
}
} else {
Poll::Ready(Ok(()))
}
Ok(())
})
} else {
Poll::Ready(Ok(()))
}
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
State::Stopping => Poll::Ready(Ok(())),
};
}
State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())),
State::Stopping => Poll::Ready(Ok(())),
}
}
}

View File

@ -7,6 +7,5 @@
pub mod counter;
pub mod dispatcher;
pub mod mpsc;
pub mod task;
pub mod timeout;

View File

@ -1,224 +0,0 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
use core::any::Any;
use core::cell::RefCell;
use core::fmt;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::collections::VecDeque;
use std::error::Error;
use std::rc::Rc;
use futures_core::stream::Stream;
use futures_sink::Sink;
use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Rc::new(RefCell::new(Shared {
has_receiver: true,
buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(),
}));
let sender = Sender {
shared: shared.clone(),
};
let receiver = Receiver { shared };
(sender, receiver)
}
#[derive(Debug)]
struct Shared<T> {
buffer: VecDeque<T>,
blocked_recv: LocalWaker,
has_receiver: bool,
}
/// The transmission end of a channel.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Unpin for Sender<T> {}
impl<T> Sender<T> {
/// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let mut shared = self.shared.borrow_mut();
if !shared.has_receiver {
return Err(SendError(item)); // receiver was dropped
};
shared.buffer.push_back(item);
shared.blocked_recv.wake();
Ok(())
}
/// Closes the sender half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.shared.borrow_mut().has_receiver = false;
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Sink<T> for Sender<T> {
type Error = SendError<T>;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
self.send(item)
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), SendError<T>>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let count = Rc::strong_count(&self.shared);
let shared = self.shared.borrow_mut();
// check is last sender is about to drop
if shared.has_receiver && count == 2 {
// Wake up receiver as its stream has ended
shared.blocked_recv.wake();
}
}
}
/// The receiving end of a channel which implements the `Stream` trait.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Receiver<T> {
shared: Rc<RefCell<Shared<T>>>,
}
impl<T> Receiver<T> {
/// Create Sender
pub fn sender(&self) -> Sender<T> {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut shared = self.shared.borrow_mut();
if Rc::strong_count(&self.shared) == 1 {
// All senders have been dropped, so drain the buffer and end the
// stream.
Poll::Ready(shared.buffer.pop_front())
} else if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg))
} else {
shared.blocked_recv.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let mut shared = self.shared.borrow_mut();
shared.buffer.clear();
shared.has_receiver = false;
}
}
/// Error type for sending, used when the receiving end of a channel is
/// dropped
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_tuple("SendError").field(&"...").finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T> {
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future::lazy;
use futures_util::{stream::Stream, StreamExt};
#[actix_rt::test]
async fn test_mpsc() {
let (tx, mut rx) = channel();
tx.send("test").unwrap();
assert_eq!(rx.next().await.unwrap(), "test");
let tx2 = tx.clone();
tx2.send("test2").unwrap();
assert_eq!(rx.next().await.unwrap(), "test2");
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx2);
assert_eq!(
lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
Poll::Pending
);
drop(tx);
assert_eq!(rx.next().await, None);
let (tx, rx) = channel();
tx.send("test").unwrap();
drop(rx);
assert!(tx.send("test").is_err());
let (mut tx, _) = channel();
let tx2 = tx.clone();
tx.close();
assert!(tx.send("test").is_err());
assert!(tx2.send("test").is_err());
}
}