mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-27 21:22:57 +01:00
add test for restart worker thread (#328)
This commit is contained in:
parent
3c1f57706a
commit
8e98d9168c
@ -125,6 +125,8 @@ impl WorkerAvailability {
|
|||||||
///
|
///
|
||||||
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
||||||
pub(crate) struct ServerWorker {
|
pub(crate) struct ServerWorker {
|
||||||
|
// UnboundedReceiver<Conn> should always be the first field.
|
||||||
|
// It must be dropped as soon as ServerWorker dropping.
|
||||||
rx: UnboundedReceiver<Conn>,
|
rx: UnboundedReceiver<Conn>,
|
||||||
rx2: UnboundedReceiver<Stop>,
|
rx2: UnboundedReceiver<Stop>,
|
||||||
services: Box<[WorkerService]>,
|
services: Box<[WorkerService]>,
|
||||||
@ -370,6 +372,7 @@ impl Default for WorkerState {
|
|||||||
|
|
||||||
impl Drop for ServerWorker {
|
impl Drop for ServerWorker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
// Stop the Arbiter ServerWorker runs on on drop.
|
||||||
Arbiter::current().stop();
|
Arbiter::current().stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::{mpsc, Arc};
|
use std::sync::{mpsc, Arc};
|
||||||
use std::{net, thread, time};
|
use std::{net, thread, time::Duration};
|
||||||
|
|
||||||
|
use actix_rt::{net::TcpStream, time::sleep};
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
use actix_service::fn_service;
|
use actix_service::fn_service;
|
||||||
use actix_utils::future::ok;
|
use actix_utils::future::ok;
|
||||||
@ -37,7 +38,7 @@ fn test_bind() {
|
|||||||
});
|
});
|
||||||
let (_, sys) = rx.recv().unwrap();
|
let (_, sys) = rx.recv().unwrap();
|
||||||
|
|
||||||
thread::sleep(time::Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
sys.stop();
|
sys.stop();
|
||||||
let _ = h.join();
|
let _ = h.join();
|
||||||
@ -64,7 +65,7 @@ fn test_listen() {
|
|||||||
});
|
});
|
||||||
let sys = rx.recv().unwrap();
|
let sys = rx.recv().unwrap();
|
||||||
|
|
||||||
thread::sleep(time::Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
sys.stop();
|
sys.stop();
|
||||||
let _ = h.join();
|
let _ = h.join();
|
||||||
@ -73,11 +74,11 @@ fn test_listen() {
|
|||||||
#[test]
|
#[test]
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
fn test_start() {
|
fn test_start() {
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
use actix_codec::{BytesCodec, Framed};
|
use actix_codec::{BytesCodec, Framed};
|
||||||
use actix_rt::net::TcpStream;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::sink::SinkExt;
|
use futures_util::sink::SinkExt;
|
||||||
use std::io::Read;
|
|
||||||
|
|
||||||
let addr = unused_addr();
|
let addr = unused_addr();
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
@ -112,16 +113,16 @@ fn test_start() {
|
|||||||
|
|
||||||
// pause
|
// pause
|
||||||
let _ = srv.pause();
|
let _ = srv.pause();
|
||||||
thread::sleep(time::Duration::from_millis(200));
|
thread::sleep(Duration::from_millis(200));
|
||||||
let mut conn = net::TcpStream::connect(addr).unwrap();
|
let mut conn = net::TcpStream::connect(addr).unwrap();
|
||||||
conn.set_read_timeout(Some(time::Duration::from_millis(100)))
|
conn.set_read_timeout(Some(Duration::from_millis(100)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let res = conn.read_exact(&mut buf);
|
let res = conn.read_exact(&mut buf);
|
||||||
assert!(res.is_err());
|
assert!(res.is_err());
|
||||||
|
|
||||||
// resume
|
// resume
|
||||||
let _ = srv.resume();
|
let _ = srv.resume();
|
||||||
thread::sleep(time::Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
assert!(net::TcpStream::connect(addr).is_ok());
|
assert!(net::TcpStream::connect(addr).is_ok());
|
||||||
@ -133,10 +134,10 @@ fn test_start() {
|
|||||||
|
|
||||||
// stop
|
// stop
|
||||||
let _ = srv.stop(false);
|
let _ = srv.stop(false);
|
||||||
thread::sleep(time::Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
assert!(net::TcpStream::connect(addr).is_err());
|
assert!(net::TcpStream::connect(addr).is_err());
|
||||||
|
|
||||||
thread::sleep(time::Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
sys.stop();
|
sys.stop();
|
||||||
let _ = h.join();
|
let _ = h.join();
|
||||||
}
|
}
|
||||||
@ -182,7 +183,7 @@ fn test_configure() {
|
|||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
});
|
});
|
||||||
let (_, sys) = rx.recv().unwrap();
|
let (_, sys) = rx.recv().unwrap();
|
||||||
thread::sleep(time::Duration::from_millis(500));
|
thread::sleep(Duration::from_millis(500));
|
||||||
|
|
||||||
assert!(net::TcpStream::connect(addr1).is_ok());
|
assert!(net::TcpStream::connect(addr1).is_ok());
|
||||||
assert!(net::TcpStream::connect(addr2).is_ok());
|
assert!(net::TcpStream::connect(addr2).is_ok());
|
||||||
@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() {
|
|||||||
// The limit test on the other hand is only for concurrent tcp stream limiting a work
|
// The limit test on the other hand is only for concurrent tcp stream limiting a work
|
||||||
// thread accept.
|
// thread accept.
|
||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
let addr = unused_addr();
|
let addr = unused_addr();
|
||||||
@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() {
|
|||||||
let counter = counter.clone();
|
let counter = counter.clone();
|
||||||
async move {
|
async move {
|
||||||
counter.fetch_add(1, Ordering::SeqCst);
|
counter.fetch_add(1, Ordering::SeqCst);
|
||||||
actix_rt::time::sleep(time::Duration::from_secs(20)).await;
|
sleep(Duration::from_secs(20)).await;
|
||||||
counter.fetch_sub(1, Ordering::SeqCst);
|
counter.fetch_sub(1, Ordering::SeqCst);
|
||||||
Ok::<(), ()>(())
|
Ok::<(), ()>(())
|
||||||
}
|
}
|
||||||
@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() {
|
|||||||
conns.push(conn);
|
conns.push(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
actix_rt::time::sleep(time::Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
|
|
||||||
// counter would remain at 3 even with 12 successful connection.
|
// counter would remain at 3 even with 12 successful connection.
|
||||||
// and 9 of them remain in backlog.
|
// and 9 of them remain in backlog.
|
||||||
@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() {
|
|||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_service_restart() {
|
async fn test_service_restart() {
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use actix_rt::{net::TcpStream, time::sleep};
|
|
||||||
use actix_service::{fn_factory, Service};
|
use actix_service::{fn_factory, Service};
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
@ -438,3 +436,143 @@ async fn test_service_restart() {
|
|||||||
let _ = server.stop(false);
|
let _ = server.stop(false);
|
||||||
let _ = h.join().unwrap();
|
let _ = h.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn worker_restart() {
|
||||||
|
use actix_service::{Service, ServiceFactory};
|
||||||
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
|
struct TestServiceFactory(Arc<AtomicUsize>);
|
||||||
|
|
||||||
|
impl ServiceFactory<TcpStream> for TestServiceFactory {
|
||||||
|
type Response = ();
|
||||||
|
type Error = ();
|
||||||
|
type Config = ();
|
||||||
|
type Service = TestService;
|
||||||
|
type InitError = ();
|
||||||
|
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||||
|
|
||||||
|
fn new_service(&self, _: Self::Config) -> Self::Future {
|
||||||
|
let counter = self.0.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Box::pin(async move { Ok(TestService(counter)) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestService(usize);
|
||||||
|
|
||||||
|
impl Service<TcpStream> for TestService {
|
||||||
|
type Response = ();
|
||||||
|
type Error = ();
|
||||||
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
actix_service::always_ready!();
|
||||||
|
|
||||||
|
fn call(&self, stream: TcpStream) -> Self::Future {
|
||||||
|
let counter = self.0;
|
||||||
|
|
||||||
|
let mut stream = stream.into_std().unwrap();
|
||||||
|
use std::io::Write;
|
||||||
|
let str = counter.to_string();
|
||||||
|
let buf = str.as_bytes();
|
||||||
|
|
||||||
|
let mut written = 0;
|
||||||
|
|
||||||
|
while written < buf.len() {
|
||||||
|
if let Ok(n) = stream.write(&buf[written..]) {
|
||||||
|
written += n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream.flush().unwrap();
|
||||||
|
stream.shutdown(net::Shutdown::Write).unwrap();
|
||||||
|
|
||||||
|
// force worker 2 to restart service once.
|
||||||
|
if counter == 2 {
|
||||||
|
panic!("panic on purpose")
|
||||||
|
} else {
|
||||||
|
Box::pin(async { Ok(()) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let addr = unused_addr();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let counter = Arc::new(AtomicUsize::new(1));
|
||||||
|
let h = thread::spawn(move || {
|
||||||
|
let counter = counter.clone();
|
||||||
|
actix_rt::System::new().block_on(async {
|
||||||
|
let server = Server::build()
|
||||||
|
.disable_signals()
|
||||||
|
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
|
||||||
|
.unwrap()
|
||||||
|
.workers(2)
|
||||||
|
.run();
|
||||||
|
|
||||||
|
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||||
|
server.await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let (server, sys) = rx.recv().unwrap();
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
let mut buf = [0; 8];
|
||||||
|
|
||||||
|
// worker 1 would not restart and return it's id consistently.
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("1", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
// worker 2 dead after return response.
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("2", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
// request to worker 1
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("1", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
// TODO: Remove sleep if it can pass CI.
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
// worker 2 restarting and work goes to worker 1.
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("1", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
// TODO: Remove sleep if it can pass CI.
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
// worker 2 restarted but worker 1 was still the next to accept connection.
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("1", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
// TODO: Remove sleep if it can pass CI.
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
// worker 2 accept connection again but it's id is 3.
|
||||||
|
let mut stream = TcpStream::connect(addr).await.unwrap();
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
let id = String::from_utf8_lossy(&buf[0..n]);
|
||||||
|
assert_eq!("3", id);
|
||||||
|
stream.shutdown().await.unwrap();
|
||||||
|
|
||||||
|
sys.stop();
|
||||||
|
let _ = server.stop(false);
|
||||||
|
let _ = h.join().unwrap();
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user