1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-02-01 06:03:08 +01:00

461 lines
16 KiB
Rust
Raw Normal View History

2021-11-04 20:30:43 +00:00
use std::{io, thread, time::Duration};
2018-08-19 10:47:04 -07:00
2021-11-04 20:30:43 +00:00
use actix_rt::time::Instant;
2021-10-22 18:17:26 +01:00
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};
2018-08-19 10:47:04 -07:00
2021-11-04 20:30:43 +00:00
use crate::{
availability::Availability,
socket::MioListener,
waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
ServerBuilder, ServerHandle,
};
const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
2018-08-19 10:47:04 -07:00
struct ServerSocketInfo {
token: usize,
lst: MioListener,
/// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
2021-11-04 20:30:43 +00:00
timeout: Option<actix_rt::time::Instant>,
2018-08-19 10:47:04 -07:00
}
/// poll instance of the server.
2021-11-04 20:30:43 +00:00
pub(crate) struct Accept {
poll: Poll,
2021-11-04 20:30:43 +00:00
waker_queue: WakerQueue,
handles: Vec<WorkerHandleAccept>,
2021-11-01 23:36:51 +00:00
srv: ServerHandle,
2018-08-19 10:47:04 -07:00
next: usize,
avail: Availability,
2021-11-04 20:30:43 +00:00
/// use the smallest duration from sockets timeout.
timeout: Option<Duration>,
paused: bool,
2018-08-19 10:47:04 -07:00
}
impl Accept {
pub(crate) fn start(
2021-11-04 20:30:43 +00:00
sockets: Vec<(usize, MioListener)>,
builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
let handle_server = ServerHandle::new(builder.cmd_tx.clone());
// construct poll instance and its waker
let poll = Poll::new()?;
let waker_queue = WakerQueue::new(poll.registry())?;
// start workers and collect handles
let (handles_accept, handles_server) = (0..builder.threads)
.map(|idx| {
// clone service factories
let factories = builder
.factories
.iter()
.map(|f| f.clone_factory())
.collect::<Vec<_>>();
// start worker using service factories
ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
})
.collect::<io::Result<Vec<_>>>()?
.into_iter()
.unzip();
let (mut accept, mut sockets) = Accept::new_with_sockets(
poll,
waker_queue.clone(),
sockets,
handles_accept,
handle_server,
)?;
thread::Builder::new()
2021-11-04 20:30:43 +00:00
.name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(&mut sockets))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
2021-11-04 20:30:43 +00:00
Ok((waker_queue, handles_server))
2018-08-19 10:47:04 -07:00
}
fn new_with_sockets(
poll: Poll,
2021-11-04 20:30:43 +00:00
waker_queue: WakerQueue,
sockets: Vec<(usize, MioListener)>,
accept_handles: Vec<WorkerHandleAccept>,
server_handle: ServerHandle,
) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
let sockets = sockets
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
2021-11-04 20:30:43 +00:00
.register(&mut lst, MioToken(token), Interest::READABLE)?;
2021-11-04 20:30:43 +00:00
Ok(ServerSocketInfo {
token,
lst,
timeout: None,
2021-11-04 20:30:43 +00:00
})
})
2021-11-04 20:30:43 +00:00
.collect::<io::Result<_>>()?;
2018-08-19 10:47:04 -07:00
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
2021-11-04 20:30:43 +00:00
avail.set_available_all(&accept_handles);
let accept = Accept {
2018-08-19 10:47:04 -07:00
poll,
2021-11-04 20:30:43 +00:00
waker_queue,
handles: accept_handles,
srv: server_handle,
2018-08-19 10:47:04 -07:00
next: 0,
avail,
2021-11-04 20:30:43 +00:00
timeout: None,
paused: false,
};
2021-11-04 20:30:43 +00:00
Ok((accept, sockets))
2018-08-19 10:47:04 -07:00
}
2021-11-04 20:30:43 +00:00
/// blocking wait for readiness events triggered by mio
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
2021-11-04 20:30:43 +00:00
let mut events = mio::Events::with_capacity(256);
2018-08-19 10:47:04 -07:00
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e),
}
}
2018-08-19 10:47:04 -07:00
for event in events.iter() {
let token = event.token();
match token {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
if exit {
2021-10-22 18:17:26 +01:00
info!("Accept thread stopped");
return;
2018-10-29 15:48:56 -07:00
}
}
2018-08-19 10:47:04 -07:00
_ => {
let token = usize::from(token);
self.accept(sockets, token);
2018-08-19 10:47:04 -07:00
}
}
}
2021-11-04 20:30:43 +00:00
// check for timeout and re-register sockets
self.process_timeout(sockets);
2018-08-19 10:47:04 -07:00
}
}
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
2021-11-04 20:30:43 +00:00
let mut guard = self.waker_queue.guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.avail.set_available(idx, true);
if !self.paused {
self.accept_all(sockets);
}
}
2021-11-04 20:30:43 +00:00
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
if !self.paused {
self.accept_all(sockets);
}
}
Some(WakerInterest::Pause) => {
drop(guard);
if !self.paused {
self.paused = true;
self.deregister_all(sockets);
}
}
2021-11-04 20:30:43 +00:00
Some(WakerInterest::Resume) => {
drop(guard);
if self.paused {
self.paused = false;
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
});
self.accept_all(sockets);
}
}
2021-11-04 20:30:43 +00:00
Some(WakerInterest::Stop) => {
if !self.paused {
self.deregister_all(sockets);
}
return true;
}
2021-11-04 20:30:43 +00:00
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
2021-11-04 20:30:43 +00:00
fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
// always remove old timeouts
if self.timeout.take().is_some() {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
// still timed out; try to set new timeout
info.timeout = Some(inst);
self.set_timeout(inst - now);
} else if !self.paused {
// timeout expired; register socket again
self.register_logged(info);
}
2021-11-04 20:30:43 +00:00
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
}
/// Update accept timeout with `duration` if it is shorter than current timeout.
fn set_timeout(&mut self, duration: Duration) {
match self.timeout {
Some(ref mut timeout) => {
if *timeout > duration {
*timeout = duration;
}
}
None => self.timeout = Some(duration),
}
2018-08-19 10:47:04 -07:00
}
#[cfg(not(target_os = "windows"))]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
}
#[cfg(target_os = "windows")]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| {
self.poll
.registry()
.reregister(&mut info.lst, token, Interest::READABLE)
})
}
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
2021-10-22 18:17:26 +01:00
Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
2021-10-22 18:17:26 +01:00
Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
}
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation:
//
2021-11-04 20:30:43 +00:00
// Every ServerSocketInfo with associated timeout will be skipped and it's timeout is
// removed in the process.
//
2021-11-04 20:30:43 +00:00
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap
// (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before
// expected timing.
sockets
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info));
}
2021-03-31 23:45:49 -07:00
// Send connection to worker and handle error.
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
let next = self.next();
match next.send(conn) {
2021-03-31 23:45:49 -07:00
Ok(_) => {
// Increment counter of WorkerHandle.
// Set worker to unavailable with it hit max (Return false).
if !next.inc_counter() {
let idx = next.idx();
self.avail.set_available(idx, false);
}
2021-03-31 23:45:49 -07:00
self.set_next();
Ok(())
}
Err(conn) => {
// Worker thread is error and could be gone.
// Remove worker handle and notify `ServerBuilder`.
self.remove_next();
2021-03-31 23:45:49 -07:00
if self.handles.is_empty() {
error!("No workers");
// All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn.
return Ok(());
} else if self.handles.len() <= self.next {
self.next = 0;
}
2021-03-31 23:45:49 -07:00
Err(conn)
}
}
}
fn accept_one(&mut self, mut conn: Conn) {
loop {
let next = self.next();
let idx = next.idx();
if self.avail.get_available(idx) {
match self.send_connection(conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
if !self.avail.available() {
while let Err(c) = self.send_connection(conn) {
conn = c;
}
return;
}
}
}
}
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
while self.avail.available() {
let info = &mut sockets[token];
match info.lst.accept() {
Ok(io) => {
let conn = Conn { io, token };
self.accept_one(conn);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// deregister listener temporary
self.deregister_logged(info);
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
2021-11-04 20:30:43 +00:00
self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
return;
2018-08-19 10:47:04 -07:00
}
};
}
}
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx))
}
#[inline(always)]
fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next]
}
/// Set next worker handle that would accept connection.
#[inline(always)]
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
/// Remove next worker handle that fail to accept connection.
fn remove_next(&mut self) {
let handle = self.handles.swap_remove(self.next);
let idx = handle.idx();
// A message is sent to `ServerBuilder` future to notify it a new worker
// should be made.
self.srv.worker_faulted(idx);
self.avail.set_available(idx, false);
}
}
2021-11-04 20:30:43 +00:00
/// This function defines errors that are per-connection; if we get this error from the `accept()`
/// system call it means the next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is
/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could
/// enter into a temporary spin loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
2018-08-19 10:47:04 -07:00
}