1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 00:01:11 +01:00

Fix low/high watermark for write/read buffers; fix oneshot impl

This commit is contained in:
Nikolay Kim 2019-12-05 01:36:31 +06:00
parent 21dcc22e53
commit c6eb318536
6 changed files with 78 additions and 83 deletions

View File

@ -1,5 +1,7 @@
# Changes
* Fix low/high watermark for write/read buffers
## [0.2.0-alpha.2]
* Migrated to `std::future`

View File

@ -5,7 +5,7 @@ use std::io::{self};
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use futures::{ready, Sink, Stream};
use pin_project::pin_project;
use tokio_codec::{Decoder, Encoder};
@ -240,17 +240,18 @@ impl<T, U> Framed<T, U> {
T: AsyncWrite,
U: Encoder,
{
let len = self.write_buf.len();
if len < self.write_lw {
self.write_buf.reserve(self.write_hw - len)
let remaining = self.write_buf.remaining_mut();
if remaining < self.write_lw {
self.write_buf.reserve(self.write_hw - remaining);
}
self.codec.encode(item, &mut self.write_buf)?;
Ok(())
}
/// Check if framed is able to write more data
pub fn is_ready(&self) -> bool {
let len = self.write_buf.len();
len < self.write_hw
self.write_buf.len() < self.write_hw
}
pub fn next_item(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
@ -292,16 +293,17 @@ impl<T, U> Framed<T, U> {
assert!(!self.eof);
// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
self.read_buf.reserve(1);
let cnt = unsafe {
match Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(cnt)) => cnt,
}
// Otherwise, try to read more data and try again. Make sure we've got room
let remaining = self.read_buf.remaining_mut();
if remaining < LW {
self.read_buf.reserve(HW - remaining)
}
let cnt = match unsafe {
Pin::new_unchecked(&mut self.io).poll_read_buf(cx, &mut self.read_buf)
} {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(cnt)) => cnt,
};
if cnt == 0 {
@ -321,9 +323,9 @@ impl<T, U> Framed<T, U> {
while !self.write_buf.is_empty() {
log::trace!("writing; remaining={}", self.write_buf.len());
let n = ready!(
unsafe { Pin::new_unchecked(&mut self.io) }.poll_write(cx, &self.write_buf)
)?;
let n = ready!(unsafe {
Pin::new_unchecked(&mut self.io).poll_write(cx, &self.write_buf)
})?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
@ -340,7 +342,7 @@ impl<T, U> Framed<T, U> {
}
// Try flushing the underlying IO
ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?;
ready!(unsafe { Pin::new_unchecked(&mut self.io).poll_flush(cx) })?;
log::trace!("framed transport flushed");
Poll::Ready(Ok(()))
@ -351,9 +353,10 @@ impl<T, U> Framed<T, U> {
T: AsyncWrite,
U: Encoder,
{
ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_flush(cx))?;
ready!(unsafe { Pin::new_unchecked(&mut self.io) }.poll_shutdown(cx))?;
unsafe {
ready!(Pin::new_unchecked(&mut self.io).poll_flush(cx))?;
ready!(Pin::new_unchecked(&mut self.io).poll_shutdown(cx))?;
}
Poll::Ready(Ok(()))
}
}

View File

@ -1,5 +1,7 @@
# Changes
* Fix oneshot
## [1.0.0-alpha.2] - 2019-12-02
* Migrate to `std::future`

View File

@ -2,10 +2,14 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::rc::Rc;
use std::rc::{Rc, Weak};
pub(crate) struct Cell<T> {
inner: Rc<UnsafeCell<T>>,
pub(crate) inner: Rc<UnsafeCell<T>>,
}
pub(crate) struct WeakCell<T> {
inner: Weak<UnsafeCell<T>>,
}
impl<T> Clone for Cell<T> {
@ -29,6 +33,12 @@ impl<T> Cell<T> {
}
}
pub fn downgrade(&self) -> WeakCell<T> {
WeakCell {
inner: Rc::downgrade(&self.inner),
}
}
pub fn get_ref(&self) -> &T {
unsafe { &*self.inner.as_ref().get() }
}
@ -37,3 +47,19 @@ impl<T> Cell<T> {
unsafe { &mut *self.inner.as_ref().get() }
}
}
impl<T> WeakCell<T> {
pub fn upgrade(&self) -> Option<Cell<T>> {
if let Some(inner) = self.inner.upgrade() {
Some(Cell { inner })
} else {
None
}
}
}
impl<T: fmt::Debug> fmt::Debug for WeakCell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

View File

@ -3,14 +3,14 @@
//! This channel is similar to that in `sync::oneshot` but cannot be sent across
//! threads.
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::rc::Rc;
use std::task::{Context, Poll};
pub use futures::channel::oneshot::Canceled;
use crate::cell::{Cell, WeakCell};
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
@ -18,13 +18,12 @@ use crate::task::LocalWaker;
/// This function is the same as `sync::oneshot::channel` except that the
/// returned values cannot be sent across threads.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
let inner = Cell::new(Inner {
value: None,
tx_task: LocalWaker::new(),
rx_task: LocalWaker::new(),
}));
});
let tx = Sender {
inner: Rc::downgrade(&inner),
inner: inner.downgrade(),
};
let rx = Receiver {
state: State::Open(inner),
@ -40,7 +39,7 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// across threads.
#[derive(Debug)]
pub struct Sender<T> {
inner: Weak<RefCell<Inner<T>>>,
inner: WeakCell<Inner<T>>,
}
/// A future representing the completion of a computation happening elsewhere in
@ -61,14 +60,13 @@ impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Inner<T>>>),
Open(Cell<Inner<T>>),
Closed(Option<T>),
}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
tx_task: LocalWaker,
rx_task: LocalWaker,
}
@ -84,43 +82,16 @@ impl<T> Sender<T> {
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
if let Some(inner) = self.inner.upgrade() {
inner.borrow_mut().value = Some(val);
if let Some(mut inner) = self.inner.upgrade() {
let inner = inner.get_mut();
inner.value = Some(val);
inner.rx_task.wake();
Ok(())
} else {
Err(val)
}
}
/// Polls this `Sender` half to detect whether the `Receiver` this has
/// paired with has gone away.
///
/// This function can be used to learn about when the `Receiver` (consumer)
/// half has gone away and nothing will be able to receive a message sent
/// from `complete`.
///
/// Like `Future::poll`, this function will panic if it's not called from
/// within the context of a task. In other words, this should only ever be
/// called from inside another future.
///
/// If `Ready` is returned then it means that the `Receiver` has disappeared
/// and the result this `Sender` would otherwise produce should no longer
/// be produced.
///
/// If `NotReady` is returned then the `Receiver` is still alive and may be
/// able to receive a message if sent. The current task, however, is
/// scheduled to receive a notification if the corresponding `Receiver` goes
/// away.
pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
match self.inner.upgrade() {
Some(inner) => {
inner.borrow_mut().tx_task.register(cx.waker());
Poll::Pending
}
None => Poll::Ready(()),
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
///
@ -141,11 +112,9 @@ impl<T> Sender<T> {
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
if let Some(inner) = self.inner.upgrade() {
inner.get_ref().rx_task.wake();
};
inner.borrow().rx_task.wake();
}
}
@ -158,12 +127,8 @@ impl<T> Receiver<T> {
/// `Canceled` is returned from `poll` then no message was sent.
pub fn close(&mut self) {
match self.state {
State::Open(ref inner) => {
let mut inner = inner.borrow_mut();
inner.tx_task.wake();
let value = inner.value.take();
drop(inner);
State::Open(ref mut inner) => {
let value = inner.get_mut().value.take();
self.state = State::Closed(value);
}
State::Closed(_) => {}
@ -186,17 +151,17 @@ impl<T> Future for Receiver<T> {
};
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.borrow_mut().value.take() {
if let Some(val) = inner.get_mut().value.take() {
return Poll::Ready(Ok(val));
}
// If we can get mutable access, then the sender has gone away. We
// didn't see a value above, so we're canceled. Otherwise we park
// our task and wait for a value to come in.
if Rc::get_mut(inner).is_some() {
if Rc::get_mut(&mut inner.inner).is_some() {
Poll::Ready(Err(Canceled))
} else {
inner.borrow().rx_task.register(cx.waker());
inner.get_ref().rx_task.register(cx.waker());
Poll::Pending
}
}

View File

@ -21,7 +21,7 @@ use std::{fmt, rc};
/// `wake`.
#[derive(Default)]
pub struct LocalWaker {
waker: UnsafeCell<Option<Waker>>,
pub(crate) waker: UnsafeCell<Option<Waker>>,
_t: PhantomData<rc::Rc<()>>,
}
@ -38,10 +38,7 @@ impl LocalWaker {
/// Registers the waker to be notified on calls to `wake`.
pub fn register(&self, waker: &Waker) {
unsafe {
let w = self.waker.get();
if (*w).is_none() {
*w = Some(waker.clone())
}
*self.waker.get() = Some(waker.clone());
}
}