diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 50df1142..5143164e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -25,7 +25,7 @@ pub(crate) struct Token(usize); impl Token { pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0 + 1); + let token = Token(self.0); self.0 += 1; token } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index ecdbde3a..5c2fa5dd 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -86,7 +86,6 @@ where let _ = f.await; drop(guard); } - .boxed_local(), ); ok(()) } else { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index b8bb67f6..c927ed92 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; -use std::{mem, time}; +use std::time; use actix_rt::time::{delay, Delay}; use actix_rt::{spawn, Arbiter}; @@ -128,7 +128,7 @@ impl WorkerAvailability { pub(crate) struct Worker { rx: UnboundedReceiver, rx2: UnboundedReceiver, - services: Vec>, + services: Vec, availability: WorkerAvailability, conns: Counter, factories: Vec>, @@ -136,6 +136,29 @@ pub(crate) struct Worker { shutdown_timeout: time::Duration, } +struct WorkerService { + factory: usize, + status: WorkerServiceStatus, + service: BoxedServerService, +} + +impl WorkerService { + fn created(&mut self, service: BoxedServerService) { + self.service = service; + self.status = WorkerServiceStatus::Unavailable; + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum WorkerServiceStatus { + Available, + Unavailable, + Failed, + Restarting, + Stopping, + Stopped, +} + impl Worker { pub(crate) fn start( rx: UnboundedReceiver, @@ -172,11 +195,13 @@ impl Worker { match res { Ok(services) => { for item in services { - for (idx, token, service) in item { - while token.0 >= wrk.services.len() { - wrk.services.push(None); - } - wrk.services[token.0] = Some((idx, service)); + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Available, + }); } } } @@ -187,52 +212,71 @@ impl Worker { } wrk.await } - .boxed_local(), ); } fn shutdown(&mut self, force: bool) { if force { - self.services.iter_mut().for_each(|h| { - if let Some(h) = h { - let _ = h.1.call((None, ServerMessage::ForceShutdown)); + self.services.iter_mut().for_each(|srv| { + if srv.status == WorkerServiceStatus::Available { + srv.status = WorkerServiceStatus::Stopped; + actix_rt::spawn( + srv.service + .call((None, ServerMessage::ForceShutdown)) + .map(|_| ()), + ); } }); } else { let timeout = self.shutdown_timeout; - self.services.iter_mut().for_each(move |h| { - if let Some(h) = h { - let _ = h.1.call((None, ServerMessage::Shutdown(timeout))); + self.services.iter_mut().for_each(move |srv| { + if srv.status == WorkerServiceStatus::Available { + srv.status = WorkerServiceStatus::Stopping; + actix_rt::spawn( + srv.service + .call((None, ServerMessage::Shutdown(timeout))) + .map(|_| ()), + ); } }); } } - fn check_readiness( - &mut self, - trace: bool, - cx: &mut Context<'_>, - ) -> Result { + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { let mut ready = self.conns.available(cx); let mut failed = None; - for (token, service) in &mut self.services.iter_mut().enumerate() { - if let Some(service) = service { - match service.1.poll_ready(cx) { + for (idx, srv) in &mut self.services.iter_mut().enumerate() { + if srv.status == WorkerServiceStatus::Available + || srv.status == WorkerServiceStatus::Unavailable + { + match srv.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { - if trace { + if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[service.0].name(Token(token)) + self.factories[srv.factory].name(Token(idx)) ); + srv.status = WorkerServiceStatus::Available; + } + } + Poll::Pending => { + ready = false; + + if srv.status == WorkerServiceStatus::Available { + trace!( + "Service {:?} is unavailable", + self.factories[srv.factory].name(Token(idx)) + ); + srv.status = WorkerServiceStatus::Unavailable; } } - Poll::Pending => ready = false, Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[service.0].name(Token(token)) + self.factories[srv.factory].name(Token(idx)) ); - failed = Some((Token(token), service.0)); + failed = Some((Token(idx), srv.factory)); + srv.status = WorkerServiceStatus::Failed; } } } @@ -246,7 +290,6 @@ impl Worker { } enum WorkerState { - None, Available, Unavailable(Vec), Restarting( @@ -254,7 +297,11 @@ enum WorkerState { Token, Pin, ()>>>>, ), - Shutdown(Delay, Delay, oneshot::Sender), + Shutdown( + Pin>, + Pin>, + Option>, + ), } impl Future for Worker { @@ -277,9 +324,9 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - delay(time::Instant::now() + time::Duration::from_secs(1)), - delay(time::Instant::now() + self.shutdown_timeout), - result, + Box::pin(delay(time::Instant::now() + time::Duration::from_secs(1))), + Box::pin(delay(time::Instant::now() + self.shutdown_timeout)), + Some(result), ); } else { let _ = result.send(true); @@ -293,49 +340,33 @@ impl Future for Worker { } } - let state = mem::replace(&mut self.state, WorkerState::None); - - match state { - WorkerState::Unavailable(mut conns) => { - match self.check_readiness(true, cx) { + match self.state { + WorkerState::Unavailable(ref mut conns) => { + let conn = conns.pop(); + match self.check_readiness(cx) { Ok(true) => { - self.state = WorkerState::Available; - // process requests from wait queue - while let Some(msg) = conns.pop() { - match self.check_readiness(false, cx) { - Ok(true) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .as_mut() - .expect("actix net bug") - .1 - .call((Some(guard), ServerMessage::Connect(msg.io))); - } - Ok(false) => { - trace!("Worker is unavailable"); - self.state = WorkerState::Unavailable(conns); - return self.poll(cx); - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.state = WorkerState::Restarting( - idx, - token, - self.factories[idx].create(), - ); - return self.poll(cx); - } - } + if let Some(conn) = conn { + let guard = self.conns.get(); + let _ = self.services[conn.token.0] + .service + .call((Some(guard), ServerMessage::Connect(conn.io))); + } else { + self.state = WorkerState::Available; + self.availability.set(true); } - self.availability.set(true); self.poll(cx) } Ok(false) => { - self.state = WorkerState::Unavailable(conns); + // push connection back to queue + if let Some(conn) = conn { + match self.state { + WorkerState::Unavailable(ref mut conns) => { + conns.push(conn); + } + _ => (), + } + } Poll::Pending } Err((token, idx)) => { @@ -343,22 +374,24 @@ impl Future for Worker { "Service {:?} failed, restarting", self.factories[idx].name(token) ); + self.services[token.0].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(idx, token, self.factories[idx].create()); self.poll(cx) } } } - WorkerState::Restarting(idx, token, mut fut) => { - match Pin::new(&mut fut).poll(cx) { + WorkerState::Restarting(idx, token, ref mut fut) => { + match Pin::new(fut).poll(cx) { Poll::Ready(Ok(item)) => { for (token, service) in item { trace!( "Service {:?} has been restarted", self.factories[idx].name(token) ); - self.services[token.0] = Some((idx, service)); + self.services[token.0].created(service); self.state = WorkerState::Unavailable(Vec::new()); + return self.poll(cx); } } Poll::Ready(Err(_)) => { @@ -368,40 +401,42 @@ impl Future for Worker { ); } Poll::Pending => { - self.state = WorkerState::Restarting(idx, token, fut); + // self.state = WorkerState::Restarting(idx, token, fut); return Poll::Pending; } } self.poll(cx) } - WorkerState::Shutdown(mut t1, mut t2, tx) => { + WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { let num = num_connections(); if num == 0 { - let _ = tx.send(true); + let _ = tx.take().unwrap().send(true); Arbiter::current().stop(); return Poll::Ready(()); } // check graceful timeout - match Pin::new(&mut t2).poll(cx) { + match t2.as_mut().poll(cx) { Poll::Pending => (), Poll::Ready(_) => { + let _ = tx.take().unwrap().send(false); self.shutdown(true); - let _ = tx.send(false); Arbiter::current().stop(); return Poll::Ready(()); } } // sleep for 1 second and then check again - match Pin::new(&mut t1).poll(cx) { + match t1.as_mut().poll(cx) { Poll::Pending => (), Poll::Ready(_) => { - t1 = delay(time::Instant::now() + time::Duration::from_secs(1)); - let _ = Pin::new(&mut t1).poll(cx); + *t1 = Box::pin(delay( + time::Instant::now() + time::Duration::from_secs(1), + )); + let _ = t1.as_mut().poll(cx); } } - self.state = WorkerState::Shutdown(t1, t2, tx); + // self.state = WorkerState::Shutdown(t1, t2, tx); Poll::Pending } WorkerState::Available => { @@ -409,13 +444,11 @@ impl Future for Worker { match Pin::new(&mut self.rx).poll_next(cx) { // handle incoming tcp stream Poll::Ready(Some(WorkerCommand(msg))) => { - match self.check_readiness(false, cx) { + match self.check_readiness(cx) { Ok(true) => { let guard = self.conns.get(); let _ = self.services[msg.token.0] - .as_mut() - .expect("actix-server bug") - .1 + .service .call((Some(guard), ServerMessage::Connect(msg.io))); continue; } @@ -430,6 +463,8 @@ impl Future for Worker { self.factories[idx].name(token) ); self.availability.set(false); + self.services[token.0].status = + WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting( idx, token, @@ -447,7 +482,6 @@ impl Future for Worker { } } } - WorkerState::None => panic!(), } } }