From 564acfbf3ad840e4bd8314a296ec5f3508d9451c Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 24 Feb 2021 02:11:08 +0000 Subject: [PATCH] remove mpsc impl from -utils --- actix-codec/src/framed.rs | 8 +- actix-utils/Cargo.toml | 1 + actix-utils/src/dispatcher.rs | 123 ++++++++----------- actix-utils/src/lib.rs | 1 - actix-utils/src/mpsc.rs | 224 ---------------------------------- 5 files changed, 57 insertions(+), 300 deletions(-) delete mode 100644 actix-utils/src/mpsc.rs diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index cf2297dc..617e868c 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -235,7 +235,7 @@ impl Framed { } /// Flush write buffer to underlying I/O stream. - pub fn flush( + pub fn poll_flush( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> @@ -271,7 +271,7 @@ impl Framed { } /// Flush write buffer and shutdown underlying I/O stream. - pub fn close( + pub fn poll_close( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> @@ -319,11 +319,11 @@ where } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.flush(cx) + self.poll_flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.close(cx) + self.poll_close(cx) } } diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index da46256e..30125f13 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -24,6 +24,7 @@ futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } log = "0.4" pin-project-lite = "0.2.0" +tokio = { version = "1", features = ["sync"] } [dev-dependencies] actix-rt = "2.0.0" diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs index 1e55aa2c..9917422d 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-utils/src/dispatcher.rs @@ -1,21 +1,20 @@ //! Framed dispatcher service and related utilities. -#![allow(type_alias_bounds)] - -use core::future::Future; -use core::pin::Pin; -use core::task::{Context, Poll}; -use core::{fmt, mem}; +use core::{ + fmt, + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures_core::stream::Stream; use log::debug; use pin_project_lite::pin_project; +use tokio::sync::mpsc; -use crate::mpsc; - -/// Framed transport errors +/// Framed transport errors. pub enum DispatcherError + Decoder, I> { Service(E), Encoder(>::Error), @@ -64,8 +63,7 @@ pub enum Message { } pin_project! { - /// Dispatcher is a future that reads frames from Framed object - /// and passes them to the service. + /// Dispatcher is a future that reads frames from Framed object and passes them to the service. pub struct Dispatcher where S: Service<::Item, Response = I>, @@ -82,8 +80,8 @@ pin_project! { state: State, #[pin] framed: Framed, - rx: mpsc::Receiver, S::Error>>, - tx: mpsc::Sender, S::Error>>, + rx: mpsc::UnboundedReceiver, S::Error>>, + tx: mpsc::UnboundedSender, S::Error>>, } } @@ -134,26 +132,7 @@ where where F: IntoService::Item>, { - let (tx, rx) = mpsc::channel(); - Dispatcher { - framed, - rx, - tx, - service: service.into_service(), - state: State::Processing, - } - } - - /// Construct new `Dispatcher` instance with customer `mpsc::Receiver` - pub fn with_rx( - framed: Framed, - service: F, - rx: mpsc::Receiver, S::Error>>, - ) -> Self - where - F: IntoService::Item>, - { - let tx = rx.sender(); + let (tx, rx) = mpsc::unbounded_channel(); Dispatcher { framed, rx, @@ -164,28 +143,28 @@ where } /// Get sink - pub fn get_sink(&self) -> mpsc::Sender, S::Error>> { + pub fn tx(&self) -> mpsc::UnboundedSender, S::Error>> { self.tx.clone() } /// Get reference to a service wrapped by `Dispatcher` instance. - pub fn get_ref(&self) -> &S { + pub fn service(&self) -> &S { &self.service } /// Get mutable reference to a service wrapped by `Dispatcher` instance. - pub fn get_mut(&mut self) -> &mut S { + pub fn service_mut(&mut self) -> &mut S { &mut self.service } /// Get reference to a framed instance wrapped by `Dispatcher` /// instance. - pub fn get_framed(&self) -> &Framed { + pub fn framed(&self) -> &Framed { &self.framed } /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. - pub fn get_framed_mut(&mut self) -> &mut Framed { + pub fn framed_mut(&mut self) -> &mut Framed { &mut self.framed } @@ -246,7 +225,7 @@ where loop { let mut this = self.as_mut().project(); while !this.framed.is_write_buf_full() { - match Pin::new(&mut this.rx).poll_next(cx) { + match this.rx.poll_recv(cx) { Poll::Ready(Some(Ok(Message::Item(msg)))) => { if let Err(err) = this.framed.as_mut().write(msg) { *this.state = State::FramedError(DispatcherError::Encoder(err)); @@ -266,7 +245,7 @@ where } if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { + match this.framed.poll_flush(cx) { Poll::Pending => break, Poll::Ready(Ok(_)) => (), Poll::Ready(Err(err)) => { @@ -298,41 +277,43 @@ where type Output = Result<(), DispatcherError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - let this = self.as_mut().project(); + let this = self.as_mut().project(); - return match this.state { - State::Processing => { - if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) { - continue; - } else { - Poll::Pending - } + match this.state { + State::Processing => { + if self.as_mut().poll_read(cx) || self.as_mut().poll_write(cx) { + self.poll(cx) + } else { + Poll::Pending } - State::Error(_) => { - // flush write buffer - if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() { - return Poll::Pending; - } - Poll::Ready(Err(this.state.take_error())) + } + + State::Error(_) => { + // flush write buffer + if !this.framed.is_write_buf_empty() && this.framed.poll_flush(cx).is_pending() + { + return Poll::Pending; } - State::FlushAndStop => { - if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Ready(Err(err)) => { - debug!("Error sending data: {:?}", err); - Poll::Ready(Ok(())) - } - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + + Poll::Ready(Err(this.state.take_error())) + } + + State::FlushAndStop => { + if !this.framed.is_write_buf_empty() { + this.framed.poll_flush(cx).map(|res| { + if let Err(err) = res { + debug!("Error sending data: {:?}", err); } - } else { - Poll::Ready(Ok(())) - } + + Ok(()) + }) + } else { + Poll::Ready(Ok(())) } - State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), - State::Stopping => Poll::Ready(Ok(())), - }; + } + + State::FramedError(_) => Poll::Ready(Err(this.state.take_framed_error())), + State::Stopping => Poll::Ready(Ok(())), } } } diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 5c10bac6..a526131e 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -7,6 +7,5 @@ pub mod counter; pub mod dispatcher; -pub mod mpsc; pub mod task; pub mod timeout; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs deleted file mode 100644 index 2f2b3f04..00000000 --- a/actix-utils/src/mpsc.rs +++ /dev/null @@ -1,224 +0,0 @@ -//! A multi-producer, single-consumer, futures-aware, FIFO queue. - -use core::any::Any; -use core::cell::RefCell; -use core::fmt; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use std::collections::VecDeque; -use std::error::Error; -use std::rc::Rc; - -use futures_core::stream::Stream; -use futures_sink::Sink; - -use crate::task::LocalWaker; - -/// Creates a unbounded in-memory channel with buffered storage. -pub fn channel() -> (Sender, Receiver) { - let shared = Rc::new(RefCell::new(Shared { - has_receiver: true, - buffer: VecDeque::new(), - blocked_recv: LocalWaker::new(), - })); - let sender = Sender { - shared: shared.clone(), - }; - let receiver = Receiver { shared }; - (sender, receiver) -} - -#[derive(Debug)] -struct Shared { - buffer: VecDeque, - blocked_recv: LocalWaker, - has_receiver: bool, -} - -/// The transmission end of a channel. -/// -/// This is created by the `channel` function. -#[derive(Debug)] -pub struct Sender { - shared: Rc>>, -} - -impl Unpin for Sender {} - -impl Sender { - /// Sends the provided message along this channel. - pub fn send(&self, item: T) -> Result<(), SendError> { - let mut shared = self.shared.borrow_mut(); - if !shared.has_receiver { - return Err(SendError(item)); // receiver was dropped - }; - shared.buffer.push_back(item); - shared.blocked_recv.wake(); - Ok(()) - } - - /// Closes the sender half - /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. - pub fn close(&mut self) { - self.shared.borrow_mut().has_receiver = false; - } -} - -impl Clone for Sender { - fn clone(&self) -> Self { - Sender { - shared: self.shared.clone(), - } - } -} - -impl Sink for Sender { - type Error = SendError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError> { - self.send(item) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll>> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -impl Drop for Sender { - fn drop(&mut self) { - let count = Rc::strong_count(&self.shared); - let shared = self.shared.borrow_mut(); - - // check is last sender is about to drop - if shared.has_receiver && count == 2 { - // Wake up receiver as its stream has ended - shared.blocked_recv.wake(); - } - } -} - -/// The receiving end of a channel which implements the `Stream` trait. -/// -/// This is created by the `channel` function. -#[derive(Debug)] -pub struct Receiver { - shared: Rc>>, -} - -impl Receiver { - /// Create Sender - pub fn sender(&self) -> Sender { - Sender { - shared: self.shared.clone(), - } - } -} - -impl Unpin for Receiver {} - -impl Stream for Receiver { - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut shared = self.shared.borrow_mut(); - if Rc::strong_count(&self.shared) == 1 { - // All senders have been dropped, so drain the buffer and end the - // stream. - Poll::Ready(shared.buffer.pop_front()) - } else if let Some(msg) = shared.buffer.pop_front() { - Poll::Ready(Some(msg)) - } else { - shared.blocked_recv.register(cx.waker()); - Poll::Pending - } - } -} - -impl Drop for Receiver { - fn drop(&mut self) { - let mut shared = self.shared.borrow_mut(); - shared.buffer.clear(); - shared.has_receiver = false; - } -} - -/// Error type for sending, used when the receiving end of a channel is -/// dropped -pub struct SendError(T); - -impl fmt::Debug for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_tuple("SendError").field(&"...").finish() - } -} - -impl fmt::Display for SendError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "send failed because receiver is gone") - } -} - -impl Error for SendError { - fn description(&self) -> &str { - "send failed because receiver is gone" - } -} - -impl SendError { - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - self.0 - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future::lazy; - use futures_util::{stream::Stream, StreamExt}; - - #[actix_rt::test] - async fn test_mpsc() { - let (tx, mut rx) = channel(); - tx.send("test").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test"); - - let tx2 = tx.clone(); - tx2.send("test2").unwrap(); - assert_eq!(rx.next().await.unwrap(), "test2"); - - assert_eq!( - lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, - Poll::Pending - ); - drop(tx2); - assert_eq!( - lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await, - Poll::Pending - ); - drop(tx); - assert_eq!(rx.next().await, None); - - let (tx, rx) = channel(); - tx.send("test").unwrap(); - drop(rx); - assert!(tx.send("test").is_err()); - - let (mut tx, _) = channel(); - let tx2 = tx.clone(); - tx.close(); - assert!(tx.send("test").is_err()); - assert!(tx2.send("test").is_err()); - } -}