1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 15:42:57 +01:00

Fix server arbiter support (#417)

This commit is contained in:
Rob Ede 2021-11-14 19:45:15 +00:00 committed by GitHub
parent ed987eef06
commit 38caa8f088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 266 additions and 173 deletions

View File

@ -18,7 +18,7 @@ path = "src/lib.rs"
[features]
default = []
io-uring = ["tokio-uring"]
io-uring = ["tokio-uring", "actix-rt/io-uring"]
[dependencies]
actix-rt = { version = "2.4.0", default-features = false }
@ -26,6 +26,7 @@ actix-service = "2.0.0"
actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"

View File

@ -4,7 +4,7 @@ use std::{
task::{Context, Poll},
};
use futures_core::future::{BoxFuture, LocalBoxFuture};
use futures_core::future::BoxFuture;
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
@ -61,63 +61,6 @@ impl<T> Future for JoinAll<T> {
}
}
pub(crate) fn join_all_local<T>(
fut: Vec<impl Future<Output = T> + 'static>,
) -> JoinAllLocal<T> {
let fut = fut
.into_iter()
.map(|f| JoinLocalFuture::LocalFuture(Box::pin(f)))
.collect();
JoinAllLocal { fut }
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAllLocal<T> {
fut: Vec<JoinLocalFuture<T>>,
}
enum JoinLocalFuture<T> {
LocalFuture(LocalBoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAllLocal<T> {}
impl<T> Future for JoinAllLocal<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::LocalFuture(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinLocalFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
@ -132,13 +75,4 @@ mod test {
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
#[actix_rt::test]
async fn test_join_all_local() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all_local(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -132,12 +132,12 @@ impl Server {
.collect();
// Give log information on what runtime will be used.
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_tokio, is_actix) {
(true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
(_, true) => info!("Actix runtime found. Starting in Actix runtime"),
match (is_actix, is_tokio) {
(false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
(true, _) => info!("Actix runtime found. Starting in Actix runtime"),
(_, _) => info!(
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
),

View File

@ -147,3 +147,16 @@ impl Drop for TestServerRuntime {
self.stop()
}
}
#[cfg(test)]
mod tests {
use actix_service::fn_service;
use super::*;
#[tokio::test]
async fn plain_tokio_runtime() {
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok());
}
}

View File

@ -24,7 +24,6 @@ use tokio::sync::{
};
use crate::{
join_all::join_all_local,
service::{BoxedServerService, InternalServiceFactory},
socket::MioStream,
waker_queue::{WakerInterest, WakerQueue},
@ -202,8 +201,8 @@ impl WorkerHandleServer {
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
conn_rx: UnboundedReceiver<Conn>,
stop_rx: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
@ -212,7 +211,7 @@ pub(crate) struct ServerWorker {
}
struct WorkerService {
factory: usize,
factory_idx: usize,
status: WorkerServiceStatus,
service: BoxedServerService,
}
@ -234,6 +233,12 @@ enum WorkerServiceStatus {
Stopped,
}
impl Default for WorkerServiceStatus {
fn default() -> Self {
Self::Unavailable
}
}
/// Config for worker behavior passed down from server builder.
#[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig {
@ -277,84 +282,105 @@ impl ServerWorker {
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
trace!("starting server worker {}", idx);
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let (tx1, conn_rx) = unbounded_channel();
let (tx2, stop_rx) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
let pair = handle_pair(idx, tx1, tx2, counter.clone());
// get actix system context if it is set
let sys = System::try_current();
let actix_system = System::try_current();
// get tokio runtime handle if it is set
let tokio_handle = tokio::runtime::Handle::try_current().ok();
// service factories initialization channel
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::<io::Result<()>>(1);
// outline of following code:
//
// if system exists
// if uring enabled
// start arbiter using uring method
// else
// start arbiter with regular tokio
// else
// if uring enabled
// start uring in spawned thread
// else
// start regular tokio in spawned thread
// every worker runs in it's own thread and tokio runtime.
// use a custom tokio runtime builder to change the settings of runtime.
match (actix_system, tokio_handle) {
(None, None) => {
panic!("No runtime detected. Start a Tokio (or Actix) runtime.");
}
// no actix system
(None, Some(rt_handle)) => {
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
// forward existing actix system context
if let Some(sys) = sys {
System::set_current(sys);
let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel();
// local set for running service init futures and worker services
let ls = tokio::task::LocalSet::new();
// init services using existing Tokio runtime (so probably on main thread)
let services = rt_handle.block_on(ls.run_until(async {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
return Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
));
}
}
}
let worker_fut = async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
Ok(services)
}));
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all_local(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
let services = match services {
Ok(services) => {
factory_tx.send(Ok(())).unwrap();
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
}
Err(err) => {
factory_tx.send(Err(err)).unwrap();
return;
}
};
factory_tx.send(()).unwrap();
let worker_services = wrap_worker_services(services);
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
let worker_fut = async move {
// spawn to make sure ServerWorker runs as non boxed future.
spawn(async move {
ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: Default::default(),
state: WorkerState::default(),
shutdown_timeout: config.shutdown_timeout,
})
.await
.expect("task 3 panic");
})
.await
.expect("task 2 panic");
}
.await;
// wake up outermost task waiting for shutdown
worker_stopped_tx.send(()).unwrap();
});
worker_stopped_rx.await.unwrap();
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
@ -362,7 +388,7 @@ impl ServerWorker {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
tokio_uring::start(worker_fut)
tokio_uring::start(worker_fut);
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
@ -373,15 +399,79 @@ impl ServerWorker {
.build()
.unwrap();
rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut))
rt.block_on(ls.run_until(worker_fut));
}
})
.expect("worker thread error/panic");
.expect("cannot spawn server worker thread");
}
// with actix system
(Some(_sys), _) => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = {
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
};
arbiter.spawn(async move {
// spawn_local to run !Send future tasks.
spawn(async move {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop();
factory_tx
.send(Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
)))
.unwrap();
return;
}
}
}
factory_tx.send(Ok(())).unwrap();
let worker_services = wrap_worker_services(services);
// spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
}
};
// wait for service factories initialization
factory_rx.recv().unwrap();
factory_rx.recv().unwrap()?;
Ok(handle_pair(idx, tx1, tx2, counter))
Ok(pair)
}
fn restart_service(&mut self, idx: usize, factory_id: usize) {
@ -419,7 +509,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
@ -430,7 +520,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
@ -438,10 +528,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
return Err((idx, srv.factory_idx));
}
}
}
@ -484,7 +574,6 @@ impl Default for WorkerState {
impl Drop for ServerWorker {
fn drop(&mut self) {
trace!("stopping ServerWorker Arbiter");
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
}
}
@ -496,7 +585,8 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut();
// `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
if let Poll::Ready(Some(Stop { graceful, tx })) =
Pin::new(&mut this.stop_rx).poll_recv(cx)
{
let num = this.counter.total();
if num == 0 {
@ -559,7 +649,7 @@ impl Future for ServerWorker {
}
WorkerState::Shutdown(ref mut shutdown) => {
// drop all pending connections in rx channel.
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.rx).poll_recv(cx) {
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) {
// WorkerCounterGuard is needed as Accept thread has incremented counter.
// It's guard's job to decrement the counter together with drop of Conn.
let guard = this.counter.guard();
@ -606,7 +696,7 @@ impl Future for ServerWorker {
}
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) {
Some(msg) => {
let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io));
@ -617,3 +707,19 @@ impl Future for ServerWorker {
}
}
}
fn wrap_worker_services(
services: Vec<(usize, usize, BoxedServerService)>,
) -> Vec<WorkerService> {
services
.into_iter()
.fold(Vec::new(), |mut services, (idx, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory_idx: idx,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
}

View File

@ -33,28 +33,63 @@ fn test_bind() {
})?
.run();
let _ = tx.send((srv.handle(), actix_rt::System::current()));
let _ = tx.send(srv.handle());
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn plain_tokio_runtime() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap();
}
#[test]
fn test_listen() {
let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let lst = net::TcpListener::bind(addr)?;
actix_rt::System::new().block_on(async {
let srv = Server::build()
.disable_signals()
@ -64,19 +99,18 @@ fn test_listen() {
})?
.run();
let _ = tx.send((srv.handle(), actix_rt::System::current()));
let _ = tx.send(srv.handle());
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap();
}
@ -283,12 +317,12 @@ async fn test_service_restart() {
.workers(1)
.run();
let _ = tx.send((srv.handle(), actix_rt::System::current()));
let _ = tx.send(srv.handle());
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
@ -311,7 +345,6 @@ async fn test_service_restart() {
assert!(num2_clone.load(Ordering::SeqCst) > 5);
let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap();
}
@ -388,13 +421,13 @@ async fn worker_restart() {
.workers(2)
.run();
let _ = tx.send((srv.handle(), actix_rt::System::current()));
let _ = tx.send(srv.handle());
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
@ -452,6 +485,5 @@ async fn worker_restart() {
stream.shutdown().await.unwrap();
let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap();
}

View File

@ -1,7 +1,7 @@
/// An implementation of [`poll_ready`]() that always signals readiness.
///
/// This should only be used for basic leaf services that have no concept of un-readiness.
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke
/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke
/// `poll_ready` implementation.
///
/// [`poll_ready`]: crate::Service::poll_ready

View File

@ -23,3 +23,4 @@ local-waker = "0.1"
[dev-dependencies]
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }
static_assertions = "1.1"

View File

@ -103,10 +103,16 @@ pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
#[cfg(test)]
mod tests {
use std::rc::Rc;
use futures_util::task::noop_waker;
use static_assertions::{assert_impl_all, assert_not_impl_all};
use super::*;
assert_impl_all!(Ready<()>: Send, Sync, Clone);
assert_not_impl_all!(Ready<Rc<()>>: Send, Sync);
#[test]
#[should_panic]
fn multiple_poll_panics() {