1
0
mirror of https://github.com/actix/actix-extras.git synced 2024-11-25 08:22:43 +01:00
actix-extras/src/server/accept.rs

513 lines
17 KiB
Rust
Raw Normal View History

use std::sync::mpsc as sync_mpsc;
use std::time::{Duration, Instant};
use std::{io, net, thread};
use futures::{sync::mpsc, Future};
use mio;
use slab::Slab;
use tokio_timer::Delay;
use actix::{msgs::Execute, Arbiter, System};
2018-08-04 01:09:46 +02:00
use super::srv::ServerCommand;
use super::worker::{Conn, Socket, Token, WorkerClient};
pub(crate) enum Command {
Pause,
Resume,
Stop,
2018-08-03 08:17:10 +02:00
Worker(WorkerClient),
}
struct ServerSocketInfo {
addr: net::SocketAddr,
2018-08-04 01:09:46 +02:00
token: Token,
sock: mio::net::TcpListener,
timeout: Option<Instant>,
}
2018-08-03 08:17:10 +02:00
#[derive(Clone)]
pub(crate) struct AcceptNotify {
ready: mio::SetReadiness,
maxconn: usize,
maxconn_low: usize,
2018-08-04 01:09:46 +02:00
maxconnrate: usize,
maxconnrate_low: usize,
2018-08-03 08:17:10 +02:00
}
impl AcceptNotify {
2018-08-04 01:09:46 +02:00
pub fn new(ready: mio::SetReadiness, maxconn: usize, maxconnrate: usize) -> Self {
2018-08-03 08:17:10 +02:00
let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
2018-08-04 01:09:46 +02:00
let maxconnrate_low = if maxconnrate > 10 {
maxconnrate - 10
} else {
0
};
2018-08-03 08:17:10 +02:00
AcceptNotify {
ready,
maxconn,
maxconn_low,
2018-08-04 01:09:46 +02:00
maxconnrate,
maxconnrate_low,
2018-08-03 08:17:10 +02:00
}
}
pub fn notify_maxconn(&self, maxconn: usize) {
if maxconn > self.maxconn_low && maxconn <= self.maxconn {
let _ = self.ready.set_readiness(mio::Ready::readable());
}
}
2018-08-04 01:09:46 +02:00
pub fn notify_maxconnrate(&self, connrate: usize) {
if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
2018-08-03 08:17:10 +02:00
let _ = self.ready.set_readiness(mio::Ready::readable());
}
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1, 0, 0)
}
}
pub(crate) struct AcceptLoop {
cmd_reg: Option<mio::Registration>,
cmd_ready: mio::SetReadiness,
notify_reg: Option<mio::Registration>,
notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<(
mpsc::UnboundedSender<ServerCommand>,
mpsc::UnboundedReceiver<ServerCommand>,
)>,
maxconn: usize,
2018-08-04 01:09:46 +02:00
maxconnrate: usize,
2018-08-03 08:17:10 +02:00
}
impl AcceptLoop {
pub fn new() -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2();
AcceptLoop {
tx,
cmd_ready,
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
maxconn: 102_400,
2018-08-04 01:09:46 +02:00
maxconnrate: 256,
2018-08-03 08:17:10 +02:00
rx: Some(rx),
srv: Some(mpsc::unbounded()),
}
}
pub fn send(&self, msg: Command) {
let _ = self.tx.send(msg);
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
}
pub fn get_notify(&self) -> AcceptNotify {
2018-08-04 01:09:46 +02:00
AcceptNotify::new(self.notify_ready.clone(), self.maxconn, self.maxconnrate)
2018-08-03 08:17:10 +02:00
}
2018-08-04 01:09:46 +02:00
pub fn maxconn(&mut self, num: usize) {
2018-08-03 08:17:10 +02:00
self.maxconn = num;
}
2018-08-04 01:09:46 +02:00
pub fn maxconnrate(&mut self, num: usize) {
self.maxconnrate = num;
2018-08-03 08:17:10 +02:00
}
pub(crate) fn start(
2018-08-04 01:09:46 +02:00
&mut self, socks: Vec<Socket>, workers: Vec<WorkerClient>,
2018-08-03 08:17:10 +02:00
) -> mpsc::UnboundedReceiver<ServerCommand> {
let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo");
Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
self.maxconn,
2018-08-04 01:09:46 +02:00
self.maxconnrate,
2018-08-03 08:17:10 +02:00
socks,
tx,
workers,
);
rx
}
}
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
2018-08-03 08:17:10 +02:00
workers: Vec<WorkerClient>,
srv: mpsc::UnboundedSender<ServerCommand>,
timer: (mio::Registration, mio::SetReadiness),
2018-08-03 08:17:10 +02:00
next: usize,
maxconn: usize,
2018-08-04 01:09:46 +02:00
maxconnrate: usize,
2018-08-03 08:17:10 +02:00
backpressure: bool,
}
2018-08-03 08:17:10 +02:00
const DELTA: usize = 100;
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
2018-08-03 08:17:10 +02:00
const NOTIFY: mio::Token = mio::Token(2);
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
2018-08-03 08:17:10 +02:00
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>, cmd_reg: mio::Registration,
2018-08-04 01:09:46 +02:00
notify_reg: mio::Registration, maxconn: usize, maxconnrate: usize,
socks: Vec<Socket>, srv: mpsc::UnboundedSender<ServerCommand>,
2018-08-03 08:17:10 +02:00
workers: Vec<WorkerClient>,
) {
let sys = System::current();
// start accept thread
let _ = thread::Builder::new()
.name("actix-web accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
accept.maxconn = maxconn;
2018-08-04 01:09:46 +02:00
accept.maxconnrate = maxconnrate;
2018-08-03 08:17:10 +02:00
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
&cmd_reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
// Start listening for notify updates
if let Err(err) = accept.poll.register(
&notify_reg,
NOTIFY,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
accept.poll();
});
}
fn new(
2018-08-04 01:09:46 +02:00
rx: sync_mpsc::Receiver<Command>, socks: Vec<Socket>,
2018-08-03 08:17:10 +02:00
workers: Vec<WorkerClient>, srv: mpsc::UnboundedSender<ServerCommand>,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start accept
let mut sockets = Slab::new();
2018-08-04 01:09:46 +02:00
for sock in socks {
let server = mio::net::TcpListener::from_std(sock.lst)
.expect("Can not create mio::net::TcpListener");
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
2018-08-03 08:17:10 +02:00
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
entry.insert(ServerSocketInfo {
2018-08-04 01:09:46 +02:00
token: sock.token,
addr: sock.addr,
sock: server,
timeout: None,
});
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
poll,
rx,
sockets,
workers,
srv,
next: 0,
timer: (tm, tmr),
2018-08-03 08:17:10 +02:00
maxconn: 102_400,
2018-08-04 01:09:46 +02:00
maxconnrate: 256,
2018-08-03 08:17:10 +02:00
backpressure: false,
}
}
fn poll(&mut self) {
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
let token = event.token();
match token {
CMD => if !self.process_cmd() {
return;
},
TIMER => self.process_timer(),
2018-08-03 08:17:10 +02:00
NOTIFY => self.backpressure(false),
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
}
}
}
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
2018-08-03 08:17:10 +02:00
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
} else {
info.timeout = Some(inst);
}
}
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
2018-08-03 08:17:10 +02:00
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
2018-08-03 08:17:10 +02:00
Command::Worker(worker) => {
self.backpressure(false);
self.workers.push(worker);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
}
2018-08-03 08:17:10 +02:00
fn backpressure(&mut self, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
}
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
}
2018-08-03 08:17:10 +02:00
}
2018-08-03 08:17:10 +02:00
fn accept_one(&mut self, mut msg: Conn<net::TcpStream>) {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
while idx < self.workers.len() {
idx += 1;
2018-08-04 01:09:46 +02:00
if self.workers[self.next].available(self.maxconn, self.maxconnrate) {
2018-08-03 08:17:10 +02:00
match self.workers[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
2018-08-03 08:17:10 +02:00
return;
}
Err(err) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx,
));
msg = err.into_inner();
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
error!("No workers");
self.backpressure(true);
return;
} else if self.workers.len() <= self.next {
self.next = 0;
}
continue;
}
}
2018-08-03 08:17:10 +02:00
}
self.next = (self.next + 1) % self.workers.len();
}
// enable backpressure
self.backpressure(true);
self.accept_one(msg);
}
}
fn accept(&mut self, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept_std() {
Ok((io, addr)) => Conn {
io,
token: info.token,
peer: Some(addr),
http2: false,
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
System::current().arbiter().do_send(Execute::new(
move || -> Result<(), ()> {
Arbiter::spawn(
Delay::new(
Instant::now() + Duration::from_millis(510),
).map_err(|_| ())
.and_then(move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(())
}),
);
Ok(())
},
));
2018-08-03 08:17:10 +02:00
return;
}
}
2018-08-03 08:17:10 +02:00
} else {
return;
};
self.accept_one(msg);
}
}
}