1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 20:12:58 +01:00

allow to close mpsc sender

This commit is contained in:
Nikolay Kim 2019-12-11 14:36:00 +06:00
parent 631cb86947
commit dded482514
2 changed files with 26 additions and 7 deletions

View File

@ -274,16 +274,12 @@ where
State::Error(_) => {
// flush write buffer
if !self.framed.is_write_buf_empty() {
match self.framed.flush(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) | Poll::Ready(Err(_)) => {
if let Poll::Pending = self.framed.flush(cx) {
return Poll::Pending;
}
}
Poll::Ready(Err(self.state.take_error()))
}
}
} else {
Poll::Ready(Err(self.state.take_error()))
}
}
State::FlushAndStop => {
if !this.framed.is_write_buf_empty() {
match this.framed.flush(cx) {

View File

@ -53,6 +53,14 @@ impl<T> Sender<T> {
shared.blocked_recv.wake();
Ok(())
}
/// Closes the sender half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
self.shared.get_mut().has_receiver = false;
}
}
impl<T> Clone for Sender<T> {
@ -104,6 +112,15 @@ pub struct Receiver<T> {
shared: Cell<Shared<T>>,
}
impl<T> Receiver<T> {
/// Create Sender
pub fn sender(&self) -> Sender<T> {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
@ -192,5 +209,11 @@ mod tests {
tx.send("test").unwrap();
drop(rx);
assert!(tx.send("test").is_err());
let (mut tx, _) = channel();
let tx2 = tx.clone();
tx.close();
assert!(tx.send("test").is_err());
assert!(tx2.send("test").is_err());
}
}