1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 02:21:07 +01:00

Fix bug where timed out socket would register itself when server in b… (#302)

Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
fakeshadow 2021-04-01 00:25:24 -07:00 committed by GitHub
parent ee3a548a85
commit 2c5c9167a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -192,16 +192,14 @@ impl Accept {
drop(guard); drop(guard);
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added // a new worker thread is made and it's handle would be added to Accept
// to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) // got timer interest and it's time to try register socket(s) again
// again.
Some(WakerInterest::Timer) => { Some(WakerInterest::Timer) => {
drop(guard); drop(guard);
self.process_timer(&mut sockets) self.process_timer(&mut sockets)
@ -238,15 +236,22 @@ impl Accept {
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) { fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
let now = Instant::now(); let now = Instant::now();
sockets.iter_mut().for_each(|(token, info)| { sockets
// only the ServerSocketInfo have an associate timeout value was de registered. .iter_mut()
if let Some(inst) = info.timeout.take() { // Only sockets that had an associated timeout were deregistered.
if now > inst { .filter(|(_, info)| info.timeout.is_some())
self.register_logged(token, info); .for_each(|(token, info)| {
} else { let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst); info.timeout = Some(inst);
} else if !self.backpressure {
self.register_logged(token, info);
} }
}
// Drop the timeout if server is in backpressure and socket timeout is expired.
// When server recovers from backpressure it will register all sockets without
// a timeout value so this socket register will be delayed till then.
}); });
} }
@ -301,20 +306,21 @@ impl Accept {
} }
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) { fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
if self.backpressure { // Only operate when server is in a different backpressure than the given flag.
if !on { if self.backpressure != on {
self.backpressure = false; if on {
for (token, info) in sockets.iter_mut() {
if info.timeout.is_some() {
// socket will attempt to re-register itself when its timeout completes
continue;
}
self.register_logged(token, info);
}
}
} else if on {
self.backpressure = true; self.backpressure = true;
// TODO: figure out if timing out sockets can be safely de-registered twice.
self.deregister_all(sockets); self.deregister_all(sockets);
} else {
self.backpressure = false;
sockets
.iter_mut()
// Only operate on sockets without associated timeout.
// Sockets with it will attempt to re-register when their timeout expires.
.filter(|(_, info)| info.timeout.is_none())
.for_each(|(token, info)| self.register_logged(token, info));
}
} }
} }