diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index fac9202e..c3074646 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -283,7 +283,6 @@ impl ServerWorker { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { let mut ready = self.conns.available(cx); - let mut failed = None; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -314,17 +313,14 @@ impl ServerWorker { "Service {:?} readiness check returned error, restarting", self.factories[srv.factory].name(Token(idx)) ); - failed = Some((Token(idx), srv.factory)); srv.status = WorkerServiceStatus::Failed; + return Err((Token(idx), srv.factory)); } } } } - if let Some(idx) = failed { - Err(idx) - } else { - Ok(ready) - } + + Ok(ready) } } @@ -408,18 +404,19 @@ impl Future for ServerWorker { let factory_id = restart.factory_id; let token = restart.token; - let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { - panic!( - "Can not restart {:?} service", - this.factories[factory_id].name(token) - ) - }); - - // Only interest in the first item? - let (token, service) = item + let service = ready!(restart.fut.as_mut().poll(cx)) + .unwrap_or_else(|_| { + panic!( + "Can not restart {:?} service", + this.factories[factory_id].name(token) + ) + }) .into_iter() - .next() - .expect("No BoxedServerService. Restarting can not progress"); + // Find the same token from vector. There should be only one + // So the first match would be enough. + .find(|(t, _)| *t == token) + .map(|(_, service)| service) + .expect("No BoxedServerService found"); trace!( "Service {:?} has been restarted", diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cd61df9f..40b07e1c 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -264,3 +264,177 @@ async fn test_max_concurrent_connections() { sys.stop(); let _ = h.join().unwrap(); } + +#[actix_rt::test] +async fn test_service_restart() { + use std::task::{Context, Poll}; + use std::time::Duration; + + use actix_rt::{net::TcpStream, time::sleep}; + use actix_service::{fn_factory, Service}; + use futures_core::future::LocalBoxFuture; + use tokio::io::AsyncWriteExt; + + struct TestService(Arc); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + let TestService(ref counter) = self; + let c = counter.fetch_add(1, Ordering::SeqCst); + // Force the service to restart on first readiness check. + if c > 0 { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(())) + } + } + + fn call(&self, _: TcpStream) -> Self::Future { + Box::pin(async { Ok(()) }) + } + } + + let addr1 = unused_addr(); + let addr2 = unused_addr(); + let (tx, rx) = mpsc::channel(); + let num = Arc::new(AtomicUsize::new(0)); + let num2 = Arc::new(AtomicUsize::new(0)); + + let num_clone = num.clone(); + let num2_clone = num2.clone(); + + let h = thread::spawn(move || { + actix_rt::System::new().block_on(async { + let server = Server::build() + .backlog(1) + .disable_signals() + .configure(move |cfg| { + let num = num.clone(); + let num2 = num2.clone(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .apply(move |rt| { + let num = num.clone(); + let num2 = num2.clone(); + rt.service( + "addr1", + fn_factory(move || { + let num = num.clone(); + async move { Ok::<_, ()>(TestService(num)) } + }), + ); + rt.service( + "addr2", + fn_factory(move || { + let num2 = num2.clone(); + async move { Ok::<_, ()>(TestService(num2)) } + }), + ); + }) + }) + .unwrap() + .workers(1) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + for _ in 0..5 { + TcpStream::connect(addr1) + .await + .unwrap() + .shutdown() + .await + .unwrap(); + TcpStream::connect(addr2) + .await + .unwrap() + .shutdown() + .await + .unwrap(); + } + + sleep(Duration::from_secs(3)).await; + + assert!(num_clone.load(Ordering::SeqCst) > 5); + assert!(num2_clone.load(Ordering::SeqCst) > 5); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); + + let addr1 = unused_addr(); + let addr2 = unused_addr(); + let (tx, rx) = mpsc::channel(); + let num = Arc::new(AtomicUsize::new(0)); + let num2 = Arc::new(AtomicUsize::new(0)); + + let num_clone = num.clone(); + let num2_clone = num2.clone(); + + let h = thread::spawn(move || { + let num = num.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .backlog(1) + .disable_signals() + .bind("addr1", addr1, move || { + let num = num.clone(); + fn_factory(move || { + let num = num.clone(); + async move { Ok::<_, ()>(TestService(num)) } + }) + }) + .unwrap() + .bind("addr2", addr2, move || { + let num2 = num2.clone(); + fn_factory(move || { + let num2 = num2.clone(); + async move { Ok::<_, ()>(TestService(num2)) } + }) + }) + .unwrap() + .workers(1) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + for _ in 0..5 { + TcpStream::connect(addr1) + .await + .unwrap() + .shutdown() + .await + .unwrap(); + TcpStream::connect(addr2) + .await + .unwrap() + .shutdown() + .await + .unwrap(); + } + + sleep(Duration::from_secs(3)).await; + + assert!(num_clone.load(Ordering::SeqCst) > 5); + assert!(num2_clone.load(Ordering::SeqCst) > 5); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +}