1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-01-18 20:01:48 +01:00

reduce branch in Accept::accept method (#300)

This commit is contained in:
fakeshadow 2021-03-29 00:19:37 -07:00 committed by GitHub
parent 0ee8d032b6
commit 26a5af70cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 48 deletions

View File

@ -208,17 +208,7 @@ impl Accept {
}
Some(WakerInterest::Pause) => {
drop(guard);
sockets.iter_mut().for_each(|(_, info)| {
match self.deregister(info) {
Ok(_) => info!(
"Paused accepting connections on {}",
info.addr
),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
});
self.deregister_all(&mut sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
@ -295,10 +285,18 @@ impl Accept {
self.poll.registry().deregister(&mut info.lst)
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.deregister(info) {
Ok(_) => info!("Paused accepting connections on {}", info.addr),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
}
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
sockets.iter_mut().for_each(|(_, info)| {
info!("Accepting connections on {} has been paused", info.addr);
let _ = self.deregister(info);
self.deregister_logged(info);
});
}
@ -388,43 +386,42 @@ impl Accept {
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let msg = if let Some(info) = sockets.get_mut(token) {
match info.lst.accept() {
Ok(Some((io, addr))) => Conn {
let info = sockets
.get_mut(token)
.expect("ServerSocketInfo is removed from Slab");
match info.lst.accept() {
Ok((io, addr)) => {
let msg = Conn {
io,
token: info.token,
peer: Some(addr),
},
Ok(None) => return,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
// deregister listener temporary
error!("Error accepting connection: {}", e);
if let Err(err) = self.deregister(info) {
error!("Can not deregister server socket {}", err);
}
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return;
}
};
self.accept_one(sockets, msg);
}
} else {
return;
};
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
self.accept_one(sockets, msg);
// deregister listener temporary
self.deregister_logged(info);
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return;
}
};
}
}
}

View File

@ -40,15 +40,15 @@ impl MioListener {
}
}
pub(crate) fn accept(&self) -> io::Result<Option<(MioStream, SocketAddr)>> {
pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> {
match *self {
MioListener::Tcp(ref lst) => lst
.accept()
.map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))),
.map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst
.accept()
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))),
.map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))),
}
}
}