1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-24 01:11:07 +01:00

refactor server worker

This commit is contained in:
Nikolay Kim 2019-12-04 15:12:02 +06:00
parent 0a4fe22003
commit c4e2051327
3 changed files with 121 additions and 88 deletions

View File

@ -25,7 +25,7 @@ pub(crate) struct Token(usize);
impl Token {
pub(crate) fn next(&mut self) -> Token {
let token = Token(self.0 + 1);
let token = Token(self.0);
self.0 += 1;
token
}

View File

@ -86,7 +86,6 @@ where
let _ = f.await;
drop(guard);
}
.boxed_local(),
);
ok(())
} else {

View File

@ -2,7 +2,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{mem, time};
use std::time;
use actix_rt::time::{delay, Delay};
use actix_rt::{spawn, Arbiter};
@ -128,7 +128,7 @@ impl WorkerAvailability {
pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
services: Vec<Option<(usize, BoxedServerService)>>,
services: Vec<WorkerService>,
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
@ -136,6 +136,29 @@ pub(crate) struct Worker {
shutdown_timeout: time::Duration,
}
struct WorkerService {
factory: usize,
status: WorkerServiceStatus,
service: BoxedServerService,
}
impl WorkerService {
fn created(&mut self, service: BoxedServerService) {
self.service = service;
self.status = WorkerServiceStatus::Unavailable;
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
Failed,
Restarting,
Stopping,
Stopped,
}
impl Worker {
pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>,
@ -172,11 +195,13 @@ impl Worker {
match res {
Ok(services) => {
for item in services {
for (idx, token, service) in item {
while token.0 >= wrk.services.len() {
wrk.services.push(None);
}
wrk.services[token.0] = Some((idx, service));
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Available,
});
}
}
}
@ -187,52 +212,71 @@ impl Worker {
}
wrk.await
}
.boxed_local(),
);
}
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|h| {
if let Some(h) = h {
let _ = h.1.call((None, ServerMessage::ForceShutdown));
self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped;
actix_rt::spawn(
srv.service
.call((None, ServerMessage::ForceShutdown))
.map(|_| ()),
);
}
});
} else {
let timeout = self.shutdown_timeout;
self.services.iter_mut().for_each(move |h| {
if let Some(h) = h {
let _ = h.1.call((None, ServerMessage::Shutdown(timeout)));
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
actix_rt::spawn(
srv.service
.call((None, ServerMessage::Shutdown(timeout)))
.map(|_| ()),
);
}
});
}
}
fn check_readiness(
&mut self,
trace: bool,
cx: &mut Context<'_>,
) -> Result<bool, (Token, usize)> {
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx);
let mut failed = None;
for (token, service) in &mut self.services.iter_mut().enumerate() {
if let Some(service) = service {
match service.1.poll_ready(cx) {
for (idx, srv) in &mut self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if trace {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[service.0].name(Token(token))
self.factories[srv.factory].name(Token(idx))
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx))
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Pending => ready = false,
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[service.0].name(Token(token))
self.factories[srv.factory].name(Token(idx))
);
failed = Some((Token(token), service.0));
failed = Some((Token(idx), srv.factory));
srv.status = WorkerServiceStatus::Failed;
}
}
}
@ -246,7 +290,6 @@ impl Worker {
}
enum WorkerState {
None,
Available,
Unavailable(Vec<Conn>),
Restarting(
@ -254,7 +297,11 @@ enum WorkerState {
Token,
Pin<Box<dyn Future<Output = Result<Vec<(Token, BoxedServerService)>, ()>>>>,
),
Shutdown(Delay, Delay, oneshot::Sender<bool>),
Shutdown(
Pin<Box<Delay>>,
Pin<Box<Delay>>,
Option<oneshot::Sender<bool>>,
),
}
impl Future for Worker {
@ -277,9 +324,9 @@ impl Future for Worker {
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
delay(time::Instant::now() + time::Duration::from_secs(1)),
delay(time::Instant::now() + self.shutdown_timeout),
result,
Box::pin(delay(time::Instant::now() + time::Duration::from_secs(1))),
Box::pin(delay(time::Instant::now() + self.shutdown_timeout)),
Some(result),
);
} else {
let _ = result.send(true);
@ -293,49 +340,33 @@ impl Future for Worker {
}
}
let state = mem::replace(&mut self.state, WorkerState::None);
match state {
WorkerState::Unavailable(mut conns) => {
match self.check_readiness(true, cx) {
match self.state {
WorkerState::Unavailable(ref mut conns) => {
let conn = conns.pop();
match self.check_readiness(cx) {
Ok(true) => {
self.state = WorkerState::Available;
// process requests from wait queue
while let Some(msg) = conns.pop() {
match self.check_readiness(false, cx) {
Ok(true) => {
let guard = self.conns.get();
let _ = self.services[msg.token.0]
.as_mut()
.expect("actix net bug")
.1
.call((Some(guard), ServerMessage::Connect(msg.io)));
}
Ok(false) => {
trace!("Worker is unavailable");
self.state = WorkerState::Unavailable(conns);
return self.poll(cx);
}
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
return self.poll(cx);
}
}
if let Some(conn) = conn {
let guard = self.conns.get();
let _ = self.services[conn.token.0]
.service
.call((Some(guard), ServerMessage::Connect(conn.io)));
} else {
self.state = WorkerState::Available;
self.availability.set(true);
}
self.availability.set(true);
self.poll(cx)
}
Ok(false) => {
self.state = WorkerState::Unavailable(conns);
// push connection back to queue
if let Some(conn) = conn {
match self.state {
WorkerState::Unavailable(ref mut conns) => {
conns.push(conn);
}
_ => (),
}
}
Poll::Pending
}
Err((token, idx)) => {
@ -343,22 +374,24 @@ impl Future for Worker {
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
self.poll(cx)
}
}
}
WorkerState::Restarting(idx, token, mut fut) => {
match Pin::new(&mut fut).poll(cx) {
WorkerState::Restarting(idx, token, ref mut fut) => {
match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(item)) => {
for (token, service) in item {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name(token)
);
self.services[token.0] = Some((idx, service));
self.services[token.0].created(service);
self.state = WorkerState::Unavailable(Vec::new());
return self.poll(cx);
}
}
Poll::Ready(Err(_)) => {
@ -368,40 +401,42 @@ impl Future for Worker {
);
}
Poll::Pending => {
self.state = WorkerState::Restarting(idx, token, fut);
// self.state = WorkerState::Restarting(idx, token, fut);
return Poll::Pending;
}
}
self.poll(cx)
}
WorkerState::Shutdown(mut t1, mut t2, tx) => {
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
let _ = tx.take().unwrap().send(true);
Arbiter::current().stop();
return Poll::Ready(());
}
// check graceful timeout
match Pin::new(&mut t2).poll(cx) {
match t2.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
let _ = tx.take().unwrap().send(false);
self.shutdown(true);
let _ = tx.send(false);
Arbiter::current().stop();
return Poll::Ready(());
}
}
// sleep for 1 second and then check again
match Pin::new(&mut t1).poll(cx) {
match t1.as_mut().poll(cx) {
Poll::Pending => (),
Poll::Ready(_) => {
t1 = delay(time::Instant::now() + time::Duration::from_secs(1));
let _ = Pin::new(&mut t1).poll(cx);
*t1 = Box::pin(delay(
time::Instant::now() + time::Duration::from_secs(1),
));
let _ = t1.as_mut().poll(cx);
}
}
self.state = WorkerState::Shutdown(t1, t2, tx);
// self.state = WorkerState::Shutdown(t1, t2, tx);
Poll::Pending
}
WorkerState::Available => {
@ -409,13 +444,11 @@ impl Future for Worker {
match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming tcp stream
Poll::Ready(Some(WorkerCommand(msg))) => {
match self.check_readiness(false, cx) {
match self.check_readiness(cx) {
Ok(true) => {
let guard = self.conns.get();
let _ = self.services[msg.token.0]
.as_mut()
.expect("actix-server bug")
.1
.service
.call((Some(guard), ServerMessage::Connect(msg.io)));
continue;
}
@ -430,6 +463,8 @@ impl Future for Worker {
self.factories[idx].name(token)
);
self.availability.set(false);
self.services[token.0].status =
WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(
idx,
token,
@ -447,7 +482,6 @@ impl Future for Worker {
}
}
}
WorkerState::None => panic!(),
}
}
}