mirror of
https://github.com/fafhrd91/actix-net
synced 2024-11-27 23:42:56 +01:00
Fix bug where worker service restart could skip failing services and not being able to restart multiple services (#318)
This commit is contained in:
parent
995efcf427
commit
5961eb892e
@ -283,7 +283,6 @@ impl ServerWorker {
|
|||||||
|
|
||||||
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
|
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
|
||||||
let mut ready = self.conns.available(cx);
|
let mut ready = self.conns.available(cx);
|
||||||
let mut failed = None;
|
|
||||||
for (idx, srv) in self.services.iter_mut().enumerate() {
|
for (idx, srv) in self.services.iter_mut().enumerate() {
|
||||||
if srv.status == WorkerServiceStatus::Available
|
if srv.status == WorkerServiceStatus::Available
|
||||||
|| srv.status == WorkerServiceStatus::Unavailable
|
|| srv.status == WorkerServiceStatus::Unavailable
|
||||||
@ -314,17 +313,14 @@ impl ServerWorker {
|
|||||||
"Service {:?} readiness check returned error, restarting",
|
"Service {:?} readiness check returned error, restarting",
|
||||||
self.factories[srv.factory].name(Token(idx))
|
self.factories[srv.factory].name(Token(idx))
|
||||||
);
|
);
|
||||||
failed = Some((Token(idx), srv.factory));
|
|
||||||
srv.status = WorkerServiceStatus::Failed;
|
srv.status = WorkerServiceStatus::Failed;
|
||||||
|
return Err((Token(idx), srv.factory));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(idx) = failed {
|
|
||||||
Err(idx)
|
Ok(ready)
|
||||||
} else {
|
|
||||||
Ok(ready)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -408,18 +404,19 @@ impl Future for ServerWorker {
|
|||||||
let factory_id = restart.factory_id;
|
let factory_id = restart.factory_id;
|
||||||
let token = restart.token;
|
let token = restart.token;
|
||||||
|
|
||||||
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
|
let service = ready!(restart.fut.as_mut().poll(cx))
|
||||||
panic!(
|
.unwrap_or_else(|_| {
|
||||||
"Can not restart {:?} service",
|
panic!(
|
||||||
this.factories[factory_id].name(token)
|
"Can not restart {:?} service",
|
||||||
)
|
this.factories[factory_id].name(token)
|
||||||
});
|
)
|
||||||
|
})
|
||||||
// Only interest in the first item?
|
|
||||||
let (token, service) = item
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.next()
|
// Find the same token from vector. There should be only one
|
||||||
.expect("No BoxedServerService. Restarting can not progress");
|
// So the first match would be enough.
|
||||||
|
.find(|(t, _)| *t == token)
|
||||||
|
.map(|(_, service)| service)
|
||||||
|
.expect("No BoxedServerService found");
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} has been restarted",
|
"Service {:?} has been restarted",
|
||||||
|
@ -264,3 +264,177 @@ async fn test_max_concurrent_connections() {
|
|||||||
sys.stop();
|
sys.stop();
|
||||||
let _ = h.join().unwrap();
|
let _ = h.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_service_restart() {
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use actix_rt::{net::TcpStream, time::sleep};
|
||||||
|
use actix_service::{fn_factory, Service};
|
||||||
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
struct TestService(Arc<AtomicUsize>);
|
||||||
|
|
||||||
|
impl Service<TcpStream> for TestService {
|
||||||
|
type Response = ();
|
||||||
|
type Error = ();
|
||||||
|
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||||
|
|
||||||
|
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
let TestService(ref counter) = self;
|
||||||
|
let c = counter.fetch_add(1, Ordering::SeqCst);
|
||||||
|
// Force the service to restart on first readiness check.
|
||||||
|
if c > 0 {
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Err(()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn call(&self, _: TcpStream) -> Self::Future {
|
||||||
|
Box::pin(async { Ok(()) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let addr1 = unused_addr();
|
||||||
|
let addr2 = unused_addr();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
let num_clone = num.clone();
|
||||||
|
let num2_clone = num2.clone();
|
||||||
|
|
||||||
|
let h = thread::spawn(move || {
|
||||||
|
actix_rt::System::new().block_on(async {
|
||||||
|
let server = Server::build()
|
||||||
|
.backlog(1)
|
||||||
|
.disable_signals()
|
||||||
|
.configure(move |cfg| {
|
||||||
|
let num = num.clone();
|
||||||
|
let num2 = num2.clone();
|
||||||
|
cfg.bind("addr1", addr1)
|
||||||
|
.unwrap()
|
||||||
|
.bind("addr2", addr2)
|
||||||
|
.unwrap()
|
||||||
|
.apply(move |rt| {
|
||||||
|
let num = num.clone();
|
||||||
|
let num2 = num2.clone();
|
||||||
|
rt.service(
|
||||||
|
"addr1",
|
||||||
|
fn_factory(move || {
|
||||||
|
let num = num.clone();
|
||||||
|
async move { Ok::<_, ()>(TestService(num)) }
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
rt.service(
|
||||||
|
"addr2",
|
||||||
|
fn_factory(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
async move { Ok::<_, ()>(TestService(num2)) }
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.workers(1)
|
||||||
|
.run();
|
||||||
|
|
||||||
|
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||||
|
server.await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let (server, sys) = rx.recv().unwrap();
|
||||||
|
|
||||||
|
for _ in 0..5 {
|
||||||
|
TcpStream::connect(addr1)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.shutdown()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
TcpStream::connect(addr2)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.shutdown()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||||
|
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||||
|
|
||||||
|
sys.stop();
|
||||||
|
let _ = server.stop(false);
|
||||||
|
let _ = h.join().unwrap();
|
||||||
|
|
||||||
|
let addr1 = unused_addr();
|
||||||
|
let addr2 = unused_addr();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
let num = Arc::new(AtomicUsize::new(0));
|
||||||
|
let num2 = Arc::new(AtomicUsize::new(0));
|
||||||
|
|
||||||
|
let num_clone = num.clone();
|
||||||
|
let num2_clone = num2.clone();
|
||||||
|
|
||||||
|
let h = thread::spawn(move || {
|
||||||
|
let num = num.clone();
|
||||||
|
actix_rt::System::new().block_on(async {
|
||||||
|
let server = Server::build()
|
||||||
|
.backlog(1)
|
||||||
|
.disable_signals()
|
||||||
|
.bind("addr1", addr1, move || {
|
||||||
|
let num = num.clone();
|
||||||
|
fn_factory(move || {
|
||||||
|
let num = num.clone();
|
||||||
|
async move { Ok::<_, ()>(TestService(num)) }
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.bind("addr2", addr2, move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
fn_factory(move || {
|
||||||
|
let num2 = num2.clone();
|
||||||
|
async move { Ok::<_, ()>(TestService(num2)) }
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.unwrap()
|
||||||
|
.workers(1)
|
||||||
|
.run();
|
||||||
|
|
||||||
|
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||||
|
server.await
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let (server, sys) = rx.recv().unwrap();
|
||||||
|
|
||||||
|
for _ in 0..5 {
|
||||||
|
TcpStream::connect(addr1)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.shutdown()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
TcpStream::connect(addr2)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.shutdown()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||||
|
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||||
|
|
||||||
|
sys.stop();
|
||||||
|
let _ = server.stop(false);
|
||||||
|
let _ = h.join().unwrap();
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user