1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-22 23:35:07 +02:00

update actix-server and actix-testing to tokio 1.0 (#239)

This commit is contained in:
fakeshadow
2020-12-29 07:44:53 +08:00
committed by GitHub
parent a09f9abfcb
commit b7202db8fd
15 changed files with 1008 additions and 841 deletions

View File

@@ -1,120 +1,86 @@
use std::sync::mpsc as sync_mpsc;
use std::time::Duration;
use std::{io, thread};
use actix_rt::time::{delay_until, Instant};
use actix_rt::time::{sleep_until, Instant};
use actix_rt::System;
use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server::Server;
use crate::socket::{SocketAddr, SocketListener, StdListener};
use crate::worker::{Conn, WorkerClient};
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle};
use crate::Token;
pub(crate) enum Command {
Pause,
Resume,
Stop,
Worker(WorkerClient),
}
struct ServerSocketInfo {
// addr for socket. mainly used for logging.
addr: SocketAddr,
// be ware this is the crate token for identify socket and should not be confused with
// mio::Token
token: Token,
sock: SocketListener,
lst: MioListener,
// timeout is used to mark the deadline when this socket's listener should be registered again
// after an error.
timeout: Option<Instant>,
}
#[derive(Clone)]
pub(crate) struct AcceptNotify(mio::SetReadiness);
impl AcceptNotify {
pub(crate) fn new(ready: mio::SetReadiness) -> Self {
AcceptNotify(ready)
}
pub(crate) fn notify(&self) {
let _ = self.0.set_readiness(mio::Ready::readable());
}
}
impl Default for AcceptNotify {
fn default() -> Self {
AcceptNotify::new(mio::Registration::new2().1)
}
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
cmd_reg: Option<mio::Registration>,
cmd_ready: mio::SetReadiness,
notify_reg: Option<mio::Registration>,
notify_ready: mio::SetReadiness,
tx: sync_mpsc::Sender<Command>,
rx: Option<sync_mpsc::Receiver<Command>>,
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> AcceptLoop {
let (tx, rx) = sync_mpsc::channel();
let (cmd_reg, cmd_ready) = mio::Registration::new2();
let (notify_reg, notify_ready) = mio::Registration::new2();
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
AcceptLoop {
tx,
cmd_ready,
cmd_reg: Some(cmd_reg),
notify_ready,
notify_reg: Some(notify_reg),
rx: Some(rx),
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub fn send(&self, msg: Command) {
let _ = self.tx.send(msg);
let _ = self.cmd_ready.set_readiness(mio::Ready::readable());
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn get_notify(&self) -> AcceptNotify {
AcceptNotify::new(self.notify_ready.clone())
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(
self.rx.take().expect("Can not re-use AcceptInfo"),
self.cmd_reg.take().expect("Can not re-use AcceptInfo"),
self.notify_reg.take().expect("Can not re-use AcceptInfo"),
socks,
srv,
workers,
);
Accept::start(poll, waker, socks, srv, handles);
}
}
/// poll instance of the server.
struct Accept {
poll: mio::Poll,
rx: sync_mpsc::Receiver<Command>,
sockets: Slab<ServerSocketInfo>,
workers: Vec<WorkerClient>,
poll: Poll,
waker: WakerQueue,
handles: Vec<WorkerHandle>,
srv: Server,
timer: (mio::Registration, mio::SetReadiness),
next: usize,
backpressure: bool,
}
const DELTA: usize = 100;
const CMD: mio::Token = mio::Token(0);
const TIMER: mio::Token = mio::Token(1);
const NOTIFY: mio::Token = mio::Token(2);
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
@@ -129,326 +95,290 @@ fn connection_error(e: &io::Error) -> bool {
}
impl Accept {
#![allow(clippy::too_many_arguments)]
pub(crate) fn start(
rx: sync_mpsc::Receiver<Command>,
cmd_reg: mio::Registration,
notify_reg: mio::Registration,
socks: Vec<(Token, StdListener)>,
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
srv: Server,
workers: Vec<WorkerClient>,
handles: Vec<WorkerHandle>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current();
// start accept thread
let _ = thread::Builder::new()
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let mut accept = Accept::new(rx, socks, workers, srv);
// Start listening for incoming commands
if let Err(err) = accept.poll.register(
&cmd_reg,
CMD,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
// Start listening for notify updates
if let Err(err) = accept.poll.register(
&notify_reg,
NOTIFY,
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register Registration: {}", err);
}
accept.poll();
});
let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets);
})
.unwrap();
}
fn new(
rx: sync_mpsc::Receiver<Command>,
socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>,
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>,
srv: Server,
) -> Accept {
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start accept
) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() {
for (hnd_token, mut lst) in socks.into_iter() {
let addr = lst.local_addr();
let server = lst.into_listener();
let entry = sockets.vacant_entry();
let token = entry.key();
// Start listening for incoming connections
if let Err(err) = poll.register(
&server,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
panic!("Can not register io: {}", err);
}
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
entry.insert(ServerSocketInfo {
addr,
token: hnd_token,
sock: server,
lst,
timeout: None,
});
}
// Timer
let (tm, tmr) = mio::Registration::new2();
if let Err(err) =
poll.register(&tm, TIMER, mio::Ready::readable(), mio::PollOpt::edge())
{
panic!("Can not register Registration: {}", err);
}
Accept {
let accept = Accept {
poll,
rx,
sockets,
workers,
waker,
handles,
srv,
next: 0,
timer: (tm, tmr),
backpressure: false,
}
};
(accept, sockets)
}
fn poll(&mut self) {
// Create storage for events
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(err) = self.poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
self.poll
.poll(&mut events, None)
.unwrap_or_else(|e| panic!("Poll error: {}", e));
for event in events.iter() {
let token = event.token();
match token {
CMD => {
if !self.process_cmd() {
return;
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
}
// a new worker thread is made and it's handle would be added
// to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s)
// again.
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
sockets.iter_mut().for_each(|(_, info)| {
match self.deregister(info) {
Ok(_) => info!(
"Paused accepting connections on {}",
info.addr
),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
});
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
}
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
}
// waker queue is drained.
None => {
// Reset the WakerQueue before break so it does not grow
// infinitely.
WakerQueue::reset(&mut guard);
break 'waker;
}
}
}
TIMER => self.process_timer(),
NOTIFY => self.backpressure(false),
},
_ => {
let token = usize::from(token);
if token < DELTA {
continue;
}
self.accept(token - DELTA);
self.accept(&mut sockets, token);
}
}
}
}
}
fn process_timer(&mut self) {
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
let now = Instant::now();
for (token, info) in self.sockets.iter_mut() {
sockets.iter_mut().for_each(|(token, info)| {
// only the ServerSocketInfo have an associate timeout value was de registered.
if let Some(inst) = info.timeout.take() {
if now > inst {
if let Err(err) = self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
) {
error!("Can not register server socket {}", err);
} else {
info!("Resume accepting connections on {}", info.addr);
}
self.register_logged(token, info);
} else {
info.timeout = Some(inst);
}
}
}
}
fn process_cmd(&mut self) -> bool {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
Command::Pause => {
for (_, info) in self.sockets.iter_mut() {
if let Err(err) = self.poll.deregister(&info.sock) {
error!("Can not deregister server socket {}", err);
} else {
info!("Paused accepting connections on {}", info.addr);
}
}
}
Command::Resume => {
for (token, info) in self.sockets.iter() {
if let Err(err) = self.register(token, info) {
error!("Can not resume socket accept process: {}", err);
} else {
info!(
"Accepting connections on {} has been resumed",
info.addr
);
}
}
}
Command::Stop => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
Command::Worker(worker) => {
self.backpressure(false);
self.workers.push(worker);
}
},
Err(err) => match err {
sync_mpsc::TryRecvError::Empty => break,
sync_mpsc::TryRecvError::Disconnected => {
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
}
return false;
}
},
}
}
true
});
}
#[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> {
self.poll.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
)
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll
.registry()
.register(&mut info.lst, MioToken(token), Interest::READABLE)
}
#[cfg(target_os = "windows")]
fn register(&self, token: usize, info: &ServerSocketInfo) -> io::Result<()> {
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
self.poll
.register(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
)
.registry()
.register(&mut info.lst, mio::Token(token), Interest::READABLE)
.or_else(|_| {
self.poll.reregister(
&info.sock,
mio::Token(token + DELTA),
mio::Ready::readable(),
mio::PollOpt::edge(),
self.poll.registry().reregister(
&mut info.lst,
mio::Token(token),
Interest::READABLE,
)
})
}
fn backpressure(&mut self, on: bool) {
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) {
match self.register(token, info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().deregister(&mut info.lst)
}
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
sockets.iter_mut().for_each(|(_, info)| {
info!("Accepting connections on {} has been paused", info.addr);
let _ = self.deregister(info);
});
}
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
if self.backpressure {
if !on {
self.backpressure = false;
for (token, info) in self.sockets.iter() {
for (token, info) in sockets.iter_mut() {
if info.timeout.is_some() {
// socket will attempt to re-register itself when its timeout completes
continue;
}
if let Err(err) = self.register(token, info) {
error!("Can not resume socket accept process: {}", err);
} else {
info!("Accepting connections on {} has been resumed", info.addr);
}
self.register_logged(token, info);
}
}
} else if on {
self.backpressure = true;
for (_, info) in self.sockets.iter() {
let _ = self.poll.deregister(&info.sock);
info!("Accepting connections on {} has been paused", info.addr);
}
self.deregister_all(sockets);
}
}
fn accept_one(&mut self, mut msg: Conn) {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
if self.backpressure {
while !self.workers.is_empty() {
match self.workers[self.next].send(msg) {
Ok(_) => (),
while !self.handles.is_empty() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
break;
}
Err(tmp) => {
self.srv.worker_faulted(self.workers[self.next].idx);
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker.
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
return;
} else if self.workers.len() <= self.next {
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
}
self.next = (self.next + 1) % self.workers.len();
break;
}
} else {
let mut idx = 0;
while idx < self.workers.len() {
while idx < self.handles.len() {
idx += 1;
if self.workers[self.next].available() {
match self.workers[self.next].send(msg) {
if self.handles[self.next].available() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.next = (self.next + 1) % self.workers.len();
self.set_next();
return;
}
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => {
self.srv.worker_faulted(self.workers[self.next].idx);
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.workers.swap_remove(self.next);
if self.workers.is_empty() {
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.backpressure(true);
self.maybe_backpressure(sockets, true);
return;
} else if self.workers.len() <= self.next {
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
}
}
self.next = (self.next + 1) % self.workers.len();
self.set_next();
}
// enable backpressure
self.backpressure(true);
self.accept_one(msg);
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, msg);
}
}
fn accept(&mut self, token: usize) {
// set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let msg = if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept() {
let msg = if let Some(info) = sockets.get_mut(token) {
match info.lst.accept() {
Ok(Some((io, addr))) => Conn {
io,
token: info.token,
@@ -458,18 +388,22 @@ impl Accept {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
// deregister listener temporary
error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.deregister(&info.sock) {
if let Err(err) = self.deregister(info) {
error!("Can not deregister server socket {}", err);
}
// sleep after error
// sleep after error. write the timeout to socket info as later the poll
// would need it mark which socket and when it's listener should be
// registered.
info.timeout = Some(Instant::now() + Duration::from_millis(500));
let r = self.timer.1.clone();
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().send(Box::pin(async move {
delay_until(Instant::now() + Duration::from_millis(510)).await;
let _ = r.set_readiness(mio::Ready::readable());
sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
}));
return;
}
@@ -478,7 +412,7 @@ impl Accept {
return;
};
self.accept_one(msg);
self.accept_one(sockets, msg);
}
}
}