From dded4825142d6c60b33e4bddfae68fa961eab4f5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 11 Dec 2019 14:36:00 +0600 Subject: [PATCH] allow to close mpsc sender --- actix-utils/src/framed.rs | 10 +++------- actix-utils/src/mpsc.rs | 23 +++++++++++++++++++++++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index c924cbe1..dfa572e3 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -274,15 +274,11 @@ where State::Error(_) => { // flush write buffer if !self.framed.is_write_buf_empty() { - match self.framed.flush(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) | Poll::Ready(Err(_)) => { - Poll::Ready(Err(self.state.take_error())) - } + if let Poll::Pending = self.framed.flush(cx) { + return Poll::Pending; } - } else { - Poll::Ready(Err(self.state.take_error())) } + Poll::Ready(Err(self.state.take_error())) } State::FlushAndStop => { if !this.framed.is_write_buf_empty() { diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index d9882584..07014835 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -53,6 +53,14 @@ impl Sender { shared.blocked_recv.wake(); Ok(()) } + + /// Closes the sender half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + self.shared.get_mut().has_receiver = false; + } } impl Clone for Sender { @@ -104,6 +112,15 @@ pub struct Receiver { shared: Cell>, } +impl Receiver { + /// Create Sender + pub fn sender(&self) -> Sender { + Sender { + shared: self.shared.clone(), + } + } +} + impl Unpin for Receiver {} impl Stream for Receiver { @@ -192,5 +209,11 @@ mod tests { tx.send("test").unwrap(); drop(rx); assert!(tx.send("test").is_err()); + + let (mut tx, _) = channel(); + let tx2 = tx.clone(); + tx.close(); + assert!(tx.send("test").is_err()); + assert!(tx2.send("test").is_err()); } }