mirror of
https://github.com/fafhrd91/actix-net
synced 2025-01-18 20:01:48 +01:00
Refactor Accept::accept_one (#303)
This commit is contained in:
parent
f21eaa954f
commit
ee3a548a85
@ -318,72 +318,74 @@ impl Accept {
|
||||
}
|
||||
}
|
||||
|
||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
|
||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
|
||||
if self.backpressure {
|
||||
// send_connection would remove fault worker from handles.
|
||||
// worst case here is conn get dropped after all handles are gone.
|
||||
while !self.handles.is_empty() {
|
||||
match self.handles[self.next].send(msg) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
break;
|
||||
}
|
||||
Err(tmp) => {
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made
|
||||
// after that remove the fault worker
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
msg = tmp;
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
return;
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
match self.send_connection(sockets, conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Do one round and try to send conn to all workers until it succeed.
|
||||
// Start from self.next.
|
||||
let mut idx = 0;
|
||||
while idx < self.handles.len() {
|
||||
idx += 1;
|
||||
if self.handles[self.next].available() {
|
||||
match self.handles[self.next].send(msg) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
return;
|
||||
}
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made.
|
||||
// after that remove the fault worker and enter backpressure if necessary.
|
||||
Err(tmp) => {
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
msg = tmp;
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
self.maybe_backpressure(sockets, true);
|
||||
return;
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
match self.send_connection(sockets, conn) {
|
||||
Ok(_) => return,
|
||||
Err(c) => conn = c,
|
||||
}
|
||||
} else {
|
||||
self.set_next();
|
||||
}
|
||||
self.set_next();
|
||||
}
|
||||
// enable backpressure
|
||||
// Sending Conn failed due to either all workers are in error or not available.
|
||||
// Enter backpressure state and try again.
|
||||
self.maybe_backpressure(sockets, true);
|
||||
self.accept_one(sockets, msg);
|
||||
self.accept_one(sockets, conn);
|
||||
}
|
||||
}
|
||||
|
||||
// set next worker handle that would accept work.
|
||||
// Set next worker handle that would accept work.
|
||||
fn set_next(&mut self) {
|
||||
self.next = (self.next + 1) % self.handles.len();
|
||||
}
|
||||
|
||||
// Send connection to worker and handle error.
|
||||
fn send_connection(
|
||||
&mut self,
|
||||
sockets: &mut Slab<ServerSocketInfo>,
|
||||
conn: Conn,
|
||||
) -> Result<(), Conn> {
|
||||
match self.handles[self.next].send(conn) {
|
||||
Ok(_) => {
|
||||
self.set_next();
|
||||
Ok(())
|
||||
}
|
||||
Err(conn) => {
|
||||
// worker lost contact and could be gone. a message is sent to
|
||||
// `ServerBuilder` future to notify it a new worker should be made.
|
||||
// after that remove the fault worker and enter backpressure if necessary.
|
||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
||||
self.handles.swap_remove(self.next);
|
||||
if self.handles.is_empty() {
|
||||
error!("No workers");
|
||||
self.maybe_backpressure(sockets, true);
|
||||
// All workers are gone and Conn is nowhere to be sent.
|
||||
// Treat this situation as Ok and drop Conn.
|
||||
return Ok(());
|
||||
} else if self.handles.len() <= self.next {
|
||||
self.next = 0;
|
||||
}
|
||||
Err(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
|
||||
loop {
|
||||
let info = sockets
|
||||
|
Loading…
x
Reference in New Issue
Block a user