diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 801467f8..65951345 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -125,6 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { + // UnboundedReceiver should always be the first field. + // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Box<[WorkerService]>, @@ -370,6 +372,7 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 40b07e1c..3af072bb 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,7 +1,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; -use std::{net, thread, time}; +use std::{net, thread, time::Duration}; +use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; use actix_utils::future::ok; @@ -37,7 +38,7 @@ fn test_bind() { }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -64,7 +65,7 @@ fn test_listen() { }); let sys = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); sys.stop(); let _ = h.join(); @@ -73,11 +74,11 @@ fn test_listen() { #[test] #[cfg(unix)] fn test_start() { + use std::io::Read; + use actix_codec::{BytesCodec, Framed}; - use actix_rt::net::TcpStream; use bytes::Bytes; use futures_util::sink::SinkExt; - use std::io::Read; let addr = unused_addr(); let (tx, rx) = mpsc::channel(); @@ -112,16 +113,16 @@ fn test_start() { // pause let _ = srv.pause(); - thread::sleep(time::Duration::from_millis(200)); + thread::sleep(Duration::from_millis(200)); let mut conn = net::TcpStream::connect(addr).unwrap(); - conn.set_read_timeout(Some(time::Duration::from_millis(100))) + conn.set_read_timeout(Some(Duration::from_millis(100))) .unwrap(); let res = conn.read_exact(&mut buf); assert!(res.is_err()); // resume let _ = srv.resume(); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok()); @@ -133,10 +134,10 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); - thread::sleep(time::Duration::from_millis(100)); + thread::sleep(Duration::from_millis(100)); sys.stop(); let _ = h.join(); } @@ -182,7 +183,7 @@ fn test_configure() { let _ = sys.run(); }); let (_, sys) = rx.recv().unwrap(); - thread::sleep(time::Duration::from_millis(500)); + thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); @@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() { // The limit test on the other hand is only for concurrent tcp stream limiting a work // thread accept. - use actix_rt::net::TcpStream; use tokio::io::AsyncWriteExt; let addr = unused_addr(); @@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() { let counter = counter.clone(); async move { counter.fetch_add(1, Ordering::SeqCst); - actix_rt::time::sleep(time::Duration::from_secs(20)).await; + sleep(Duration::from_secs(20)).await; counter.fetch_sub(1, Ordering::SeqCst); Ok::<(), ()>(()) } @@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() { conns.push(conn); } - actix_rt::time::sleep(time::Duration::from_secs(5)).await; + sleep(Duration::from_secs(5)).await; // counter would remain at 3 even with 12 successful connection. // and 9 of them remain in backlog. @@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() { #[actix_rt::test] async fn test_service_restart() { use std::task::{Context, Poll}; - use std::time::Duration; - use actix_rt::{net::TcpStream, time::sleep}; use actix_service::{fn_factory, Service}; use futures_core::future::LocalBoxFuture; use tokio::io::AsyncWriteExt; @@ -438,3 +436,143 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +}