1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-17 13:33:31 +01:00

Refactor LocalWaker (#224)

This commit is contained in:
Juan Aguilar 2020-12-13 20:26:57 +01:00 committed by GitHub
parent 049795662f
commit 02a902068f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 21 additions and 16 deletions

View File

@ -333,7 +333,7 @@ impl Future for CleanupPending {
let mut pending = cell.borrow_mut(); let mut pending = cell.borrow_mut();
let mut i = 0; let mut i = 0;
while i != pending.len() { while i != pending.len() {
if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) { if Pin::new(&mut pending[i]).poll(cx).is_ready() {
pending.remove(i); pending.remove(i);
} else { } else {
i += 1; i += 1;

View File

@ -286,7 +286,7 @@ impl ServerBuilder {
// handle signals // handle signals
if !self.no_signals { if !self.no_signals {
Signals::start(self.server.clone()).unwrap(); Signals::start(self.server.clone());
} }
// start http server actor // start http server actor

View File

@ -1,5 +1,4 @@
use std::future::Future; use std::future::Future;
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -24,13 +23,13 @@ pub(crate) enum Signal {
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server, srv: Server,
#[cfg(not(unix))] #[cfg(not(unix))]
stream: Pin<Box<dyn Future<Output = io::Result<()>>>>, stream: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
#[cfg(unix)] #[cfg(unix)]
streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
} }
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) -> io::Result<()> { pub(crate) fn start(srv: Server) {
actix_rt::spawn(lazy(|_| { actix_rt::spawn(lazy(|_| {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
@ -66,8 +65,6 @@ impl Signals {
actix_rt::spawn(Signals { srv, streams }) actix_rt::spawn(Signals { srv, streams })
} }
})); }));
Ok(())
} }
} }

View File

@ -290,10 +290,8 @@ where
} }
State::Error(_) => { State::Error(_) => {
// flush write buffer // flush write buffer
if !this.framed.is_write_buf_empty() { if !this.framed.is_write_buf_empty() && this.framed.flush(cx).is_pending() {
if let Poll::Pending = this.framed.flush(cx) { return Poll::Pending;
return Poll::Pending;
}
} }
Poll::Ready(Err(this.state.take_error())) Poll::Ready(Err(this.state.take_error()))
} }

View File

@ -74,7 +74,7 @@ where
type Future = InFlightServiceResponse<T>; type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Poll::Pending = self.service.poll_ready(cx)? { if self.service.poll_ready(cx)?.is_pending() {
Poll::Pending Poll::Pending
} else if !self.count.available(cx) { } else if !self.count.available(cx) {
log::trace!("InFlight limit exceeded"); log::trace!("InFlight limit exceeded");

View File

@ -160,7 +160,12 @@ where
} }
// check nested service // check nested service
if let Poll::Pending = self.service.poll_ready(cx).map_err(InOrderError::Service)? { if self
.service
.poll_ready(cx)
.map_err(InOrderError::Service)?
.is_pending()
{
Poll::Pending Poll::Pending
} else { } else {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))

View File

@ -19,6 +19,7 @@ use std::{fmt, rc};
/// ///
/// A single `AtomicWaker` may be reused for any number of calls to `register` or /// A single `AtomicWaker` may be reused for any number of calls to `register` or
/// `wake`. /// `wake`.
// TODO: Refactor to Cell when remove deprecated methods (@botika)
#[derive(Default)] #[derive(Default)]
pub struct LocalWaker { pub struct LocalWaker {
pub(crate) waker: UnsafeCell<Option<Waker>>, pub(crate) waker: UnsafeCell<Option<Waker>>,
@ -34,6 +35,10 @@ impl LocalWaker {
} }
} }
#[deprecated(
since = "2.1.0",
note = "In favor of `wake`. State of the register doesn't matter at `wake` up"
)]
#[inline] #[inline]
/// Check if waker has been registered. /// Check if waker has been registered.
pub fn is_registered(&self) -> bool { pub fn is_registered(&self) -> bool {
@ -47,9 +52,8 @@ impl LocalWaker {
pub fn register(&self, waker: &Waker) -> bool { pub fn register(&self, waker: &Waker) -> bool {
unsafe { unsafe {
let w = self.waker.get(); let w = self.waker.get();
let is_registered = (*w).is_some(); let last_waker = w.replace(Some(waker.clone()));
*w = Some(waker.clone()); last_waker.is_some()
is_registered
} }
} }
@ -63,6 +67,7 @@ impl LocalWaker {
} }
} }
#[inline]
/// Returns the last `Waker` passed to `register`, so that the user can wake it. /// Returns the last `Waker` passed to `register`, so that the user can wake it.
/// ///
/// If a waker has not been registered, this returns `None`. /// If a waker has not been registered, this returns `None`.