diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index 7c96b4ca..e2193abe 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,5 +1,7 @@ # Changes +* Fix low/high watermark for write/read buffers + ## [0.2.0-alpha.2] * Migrated to `std::future` diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index e458352c..682f446c 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -5,7 +5,7 @@ use std::io::{self}; use std::pin::Pin; use std::task::{Context, Poll}; -use bytes::BytesMut; +use bytes::{BufMut, BytesMut}; use futures::{ready, Sink, Stream}; use pin_project::pin_project; use tokio_codec::{Decoder, Encoder}; @@ -240,17 +240,18 @@ impl Framed { T: AsyncWrite, U: Encoder, { - let len = self.write_buf.len(); - if len < self.write_lw { - self.write_buf.reserve(self.write_hw - len) + let remaining = self.write_buf.remaining_mut(); + if remaining < self.write_lw { + self.write_buf.reserve(self.write_hw - remaining); } + self.codec.encode(item, &mut self.write_buf)?; Ok(()) } + /// Check if framed is able to write more data pub fn is_ready(&self) -> bool { - let len = self.write_buf.len(); - len < self.write_hw + self.write_buf.len() < self.write_hw } pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll>> @@ -292,16 +293,17 @@ impl Framed { assert!(!self.eof); - // Otherwise, try to read more data and try again. Make sure we've - // got room for at least one byte to read to ensure that we don't - // get a spurious 0 that looks like EOF - self.read_buf.reserve(1); - let cnt = unsafe { - match Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Ok(cnt)) => cnt, - } + // Otherwise, try to read more data and try again. Make sure we've got room + let remaining = self.read_buf.remaining_mut(); + if remaining < LW { + self.read_buf.reserve(HW - remaining) + } + let cnt = match unsafe { + Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf) + } { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Ok(cnt)) => cnt, }; if cnt == 0 { @@ -321,9 +323,9 @@ impl Framed { while !self.write_buf.is_empty() { log::trace!("writing; remaining={}", self.write_buf.len()); - let n = ready!( - unsafe { Pin::new_unchecked(&mut self.io) }.poll_write(cx, &self.write_buf) - )?; + let n = ready!(unsafe { + Pin::new_unchecked(&mut self.io).poll_write(cx, &self.write_buf) + })?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -340,7 +342,7 @@ impl Framed { } // Try flushing the underlying IO - ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?; + ready!(unsafe { Pin::new_unchecked(&mut self.io).poll_flush(cx) })?; log::trace!("framed transport flushed"); Poll::Ready(Ok(())) @@ -351,9 +353,10 @@ impl Framed { T: AsyncWrite, U: Encoder, { - ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?; - ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_shutdown(cx))?; - + unsafe { + ready!(Pin::new_unchecked(&mut self.io).poll_flush(cx))?; + ready!(Pin::new_unchecked(&mut self.io).poll_shutdown(cx))?; + } Poll::Ready(Ok(())) } } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 3aa5deab..c33c381b 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,5 +1,7 @@ # Changes +* Fix oneshot + ## [1.0.0-alpha.2] - 2019-12-02 * Migrate to `std::future` diff --git a/actix-utils/src/cell.rs b/actix-utils/src/cell.rs index ffc28eb2..e85d81d3 100644 --- a/actix-utils/src/cell.rs +++ b/actix-utils/src/cell.rs @@ -2,10 +2,14 @@ use std::cell::UnsafeCell; use std::fmt; -use std::rc::Rc; +use std::rc::{Rc, Weak}; pub(crate) struct Cell { - inner: Rc>, + pub(crate) inner: Rc>, +} + +pub(crate) struct WeakCell { + inner: Weak>, } impl Clone for Cell { @@ -29,6 +33,12 @@ impl Cell { } } + pub fn downgrade(&self) -> WeakCell { + WeakCell { + inner: Rc::downgrade(&self.inner), + } + } + pub fn get_ref(&self) -> &T { unsafe { &*self.inner.as_ref().get() } } @@ -37,3 +47,19 @@ impl Cell { unsafe { &mut *self.inner.as_ref().get() } } } + +impl WeakCell { + pub fn upgrade(&self) -> Option> { + if let Some(inner) = self.inner.upgrade() { + Some(Cell { inner }) + } else { + None + } + } +} + +impl fmt::Debug for WeakCell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs index 7acfca42..9940cfa3 100644 --- a/actix-utils/src/oneshot.rs +++ b/actix-utils/src/oneshot.rs @@ -3,14 +3,14 @@ //! This channel is similar to that in `sync::oneshot` but cannot be sent across //! threads. -use std::cell::RefCell; use std::future::Future; use std::pin::Pin; -use std::rc::{Rc, Weak}; +use std::rc::Rc; use std::task::{Context, Poll}; pub use futures::channel::oneshot::Canceled; +use crate::cell::{Cell, WeakCell}; use crate::task::LocalWaker; /// Creates a new futures-aware, one-shot channel. @@ -18,13 +18,12 @@ use crate::task::LocalWaker; /// This function is the same as `sync::oneshot::channel` except that the /// returned values cannot be sent across threads. pub fn channel() -> (Sender, Receiver) { - let inner = Rc::new(RefCell::new(Inner { + let inner = Cell::new(Inner { value: None, - tx_task: LocalWaker::new(), rx_task: LocalWaker::new(), - })); + }); let tx = Sender { - inner: Rc::downgrade(&inner), + inner: inner.downgrade(), }; let rx = Receiver { state: State::Open(inner), @@ -40,7 +39,7 @@ pub fn channel() -> (Sender, Receiver) { /// across threads. #[derive(Debug)] pub struct Sender { - inner: Weak>>, + inner: WeakCell>, } /// A future representing the completion of a computation happening elsewhere in @@ -61,14 +60,13 @@ impl Unpin for Sender {} #[derive(Debug)] enum State { - Open(Rc>>), + Open(Cell>), Closed(Option), } #[derive(Debug)] struct Inner { value: Option, - tx_task: LocalWaker, rx_task: LocalWaker, } @@ -84,43 +82,16 @@ impl Sender { /// this function was called, however, then `Err` is returned with the value /// provided. pub fn send(self, val: T) -> Result<(), T> { - if let Some(inner) = self.inner.upgrade() { - inner.borrow_mut().value = Some(val); + if let Some(mut inner) = self.inner.upgrade() { + let inner = inner.get_mut(); + inner.value = Some(val); + inner.rx_task.wake(); Ok(()) } else { Err(val) } } - /// Polls this `Sender` half to detect whether the `Receiver` this has - /// paired with has gone away. - /// - /// This function can be used to learn about when the `Receiver` (consumer) - /// half has gone away and nothing will be able to receive a message sent - /// from `complete`. - /// - /// Like `Future::poll`, this function will panic if it's not called from - /// within the context of a task. In other words, this should only ever be - /// called from inside another future. - /// - /// If `Ready` is returned then it means that the `Receiver` has disappeared - /// and the result this `Sender` would otherwise produce should no longer - /// be produced. - /// - /// If `NotReady` is returned then the `Receiver` is still alive and may be - /// able to receive a message if sent. The current task, however, is - /// scheduled to receive a notification if the corresponding `Receiver` goes - /// away. - pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match self.inner.upgrade() { - Some(inner) => { - inner.borrow_mut().tx_task.register(cx.waker()); - Poll::Pending - } - None => Poll::Ready(()), - } - } - /// Tests to see whether this `Sender`'s corresponding `Receiver` /// has gone away. /// @@ -141,11 +112,9 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - let inner = match self.inner.upgrade() { - Some(inner) => inner, - None => return, + if let Some(inner) = self.inner.upgrade() { + inner.get_ref().rx_task.wake(); }; - inner.borrow().rx_task.wake(); } } @@ -158,12 +127,8 @@ impl Receiver { /// `Canceled` is returned from `poll` then no message was sent. pub fn close(&mut self) { match self.state { - State::Open(ref inner) => { - let mut inner = inner.borrow_mut(); - inner.tx_task.wake(); - let value = inner.value.take(); - drop(inner); - + State::Open(ref mut inner) => { + let value = inner.get_mut().value.take(); self.state = State::Closed(value); } State::Closed(_) => {} @@ -186,17 +151,17 @@ impl Future for Receiver { }; // If we've got a value, then skip the logic below as we're done. - if let Some(val) = inner.borrow_mut().value.take() { + if let Some(val) = inner.get_mut().value.take() { return Poll::Ready(Ok(val)); } // If we can get mutable access, then the sender has gone away. We // didn't see a value above, so we're canceled. Otherwise we park // our task and wait for a value to come in. - if Rc::get_mut(inner).is_some() { + if Rc::get_mut(&mut inner.inner).is_some() { Poll::Ready(Err(Canceled)) } else { - inner.borrow().rx_task.register(cx.waker()); + inner.get_ref().rx_task.register(cx.waker()); Poll::Pending } } diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs index 291b55d5..c86378b2 100644 --- a/actix-utils/src/task.rs +++ b/actix-utils/src/task.rs @@ -21,7 +21,7 @@ use std::{fmt, rc}; /// `wake`. #[derive(Default)] pub struct LocalWaker { - waker: UnsafeCell>, + pub(crate) waker: UnsafeCell>, _t: PhantomData>, } @@ -38,10 +38,7 @@ impl LocalWaker { /// Registers the waker to be notified on calls to `wake`. pub fn register(&self, waker: &Waker) { unsafe { - let w = self.waker.get(); - if (*w).is_none() { - *w = Some(waker.clone()) - } + *self.waker.get() = Some(waker.clone()); } }