From ee3a548a8568e939021cc8c73e77638678d8c4ef Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 31 Mar 2021 23:45:49 -0700 Subject: [PATCH] Refactor Accept::accept_one (#303) --- actix-server/src/accept.rs | 94 +++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 2b9c7206..34bb029d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -318,72 +318,74 @@ impl Accept { } } - fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { if self.backpressure { + // send_connection would remove fault worker from handles. + // worst case here is conn get dropped after all handles are gone. while !self.handles.is_empty() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - break; - } - Err(tmp) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made - // after that remove the fault worker - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } } } else { + // Do one round and try to send conn to all workers until it succeed. + // Start from self.next. let mut idx = 0; while idx < self.handles.len() { idx += 1; if self.handles[self.next].available() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - return; - } - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - Err(tmp) => { - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - self.maybe_backpressure(sockets, true); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } + } else { + self.set_next(); } - self.set_next(); } - // enable backpressure + // Sending Conn failed due to either all workers are in error or not available. + // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); - self.accept_one(sockets, msg); + self.accept_one(sockets, conn); } } - // set next worker handle that would accept work. + // Set next worker handle that would accept work. fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } + // Send connection to worker and handle error. + fn send_connection( + &mut self, + sockets: &mut Slab, + conn: Conn, + ) -> Result<(), Conn> { + match self.handles[self.next].send(conn) { + Ok(_) => { + self.set_next(); + Ok(()) + } + Err(conn) => { + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. + self.srv.worker_faulted(self.handles[self.next].idx); + self.handles.swap_remove(self.next); + if self.handles.is_empty() { + error!("No workers"); + self.maybe_backpressure(sockets, true); + // All workers are gone and Conn is nowhere to be sent. + // Treat this situation as Ok and drop Conn. + return Ok(()); + } else if self.handles.len() <= self.next { + self.next = 0; + } + Err(conn) + } + } + } + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { let info = sockets