diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 34bb029d..2750d1c5 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -187,21 +187,19 @@ impl Accept { let mut guard = self.waker.guard(); match guard.pop_front() { // worker notify it becomes available. we may want to recover - // from backpressure. + // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); self.maybe_backpressure(&mut sockets, false); } - // a new worker thread is made and it's handle would be added - // to Accept + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); self.handles.push(handle); } - // got timer interest and it's time to try register socket(s) - // again. + // got timer interest and it's time to try register socket(s) again Some(WakerInterest::Timer) => { drop(guard); self.process_timer(&mut sockets) @@ -238,16 +236,23 @@ impl Accept { fn process_timer(&self, sockets: &mut Slab) { let now = Instant::now(); - sockets.iter_mut().for_each(|(token, info)| { - // only the ServerSocketInfo have an associate timeout value was de registered. - if let Some(inst) = info.timeout.take() { - if now > inst { - self.register_logged(token, info); - } else { + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|(_, info)| info.timeout.is_some()) + .for_each(|(token, info)| { + let inst = info.timeout.take().unwrap(); + + if now < inst { info.timeout = Some(inst); + } else if !self.backpressure { + self.register_logged(token, info); } - } - }); + + // Drop the timeout if server is in backpressure and socket timeout is expired. + // When server recovers from backpressure it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); } #[cfg(not(target_os = "windows"))] @@ -301,20 +306,21 @@ impl Accept { } fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - if self.backpressure { - if !on { + // Only operate when server is in a different backpressure than the given flag. + if self.backpressure != on { + if on { + self.backpressure = true; + // TODO: figure out if timing out sockets can be safely de-registered twice. + self.deregister_all(sockets); + } else { self.backpressure = false; - for (token, info) in sockets.iter_mut() { - if info.timeout.is_some() { - // socket will attempt to re-register itself when its timeout completes - continue; - } - self.register_logged(token, info); - } + sockets + .iter_mut() + // Only operate on sockets without associated timeout. + // Sockets with it will attempt to re-register when their timeout expires. + .filter(|(_, info)| info.timeout.is_none()) + .for_each(|(token, info)| self.register_logged(token, info)); } - } else if on { - self.backpressure = true; - self.deregister_all(sockets); } }