diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 91096cd6..66e77c2f 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["tokio-uring"] +io-uring = ["tokio-uring", "actix-rt/io-uring"] [dependencies] actix-rt = { version = "2.4.0", default-features = false } @@ -26,6 +26,7 @@ actix-service = "2.0.0" actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs index ae68871c..bdef62ef 100644 --- a/actix-server/src/join_all.rs +++ b/actix-server/src/join_all.rs @@ -4,7 +4,7 @@ use std::{ task::{Context, Poll}, }; -use futures_core::future::{BoxFuture, LocalBoxFuture}; +use futures_core::future::BoxFuture; // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. @@ -61,63 +61,6 @@ impl Future for JoinAll { } } -pub(crate) fn join_all_local( - fut: Vec + 'static>, -) -> JoinAllLocal { - let fut = fut - .into_iter() - .map(|f| JoinLocalFuture::LocalFuture(Box::pin(f))) - .collect(); - - JoinAllLocal { fut } -} - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAllLocal { - fut: Vec>, -} - -enum JoinLocalFuture { - LocalFuture(LocalBoxFuture<'static, T>), - Result(Option), -} - -impl Unpin for JoinAllLocal {} - -impl Future for JoinAllLocal { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinLocalFuture::LocalFuture(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinLocalFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinLocalFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - #[cfg(test)] mod test { use super::*; @@ -132,13 +75,4 @@ mod test { assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap()); } - - #[actix_rt::test] - async fn test_join_all_local() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all_local(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f1edcb23..9611062a 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -132,12 +132,12 @@ impl Server { .collect(); // Give log information on what runtime will be used. - let is_tokio = tokio::runtime::Handle::try_current().is_ok(); let is_actix = actix_rt::System::try_current().is_some(); + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); - match (is_tokio, is_actix) { - (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), - (_, true) => info!("Actix runtime found. Starting in Actix runtime"), + match (is_actix, is_tokio) { + (false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (true, _) => info!("Actix runtime found. Starting in Actix runtime"), (_, _) => info!( "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" ), diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 7cf0d0a6..a7914372 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -147,3 +147,16 @@ impl Drop for TestServerRuntime { self.stop() } } + +#[cfg(test)] +mod tests { + use actix_service::fn_service; + + use super::*; + + #[tokio::test] + async fn plain_tokio_runtime() { + let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) })); + assert!(srv.connect().is_ok()); + } +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 02f68294..0822ab7c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,6 @@ use tokio::sync::{ }; use crate::{ - join_all::join_all_local, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -202,8 +201,8 @@ impl WorkerHandleServer { pub(crate) struct ServerWorker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + conn_rx: UnboundedReceiver, + stop_rx: UnboundedReceiver, counter: WorkerCounter, services: Box<[WorkerService]>, factories: Box<[Box]>, @@ -212,7 +211,7 @@ pub(crate) struct ServerWorker { } struct WorkerService { - factory: usize, + factory_idx: usize, status: WorkerServiceStatus, service: BoxedServerService, } @@ -234,6 +233,12 @@ enum WorkerServiceStatus { Stopped, } +impl Default for WorkerServiceStatus { + fn default() -> Self { + Self::Unavailable + } +} + /// Config for worker behavior passed down from server builder. #[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { @@ -277,111 +282,196 @@ impl ServerWorker { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { trace!("starting server worker {}", idx); - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (tx1, conn_rx) = unbounded_channel(); + let (tx2, stop_rx) = unbounded_channel(); let counter = Counter::new(config.max_concurrent_connections); - - let counter_clone = counter.clone(); + let pair = handle_pair(idx, tx1, tx2, counter.clone()); // get actix system context if it is set - let sys = System::try_current(); + let actix_system = System::try_current(); + + // get tokio runtime handle if it is set + let tokio_handle = tokio::runtime::Handle::try_current().ok(); // service factories initialization channel - let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::>(1); + + // outline of following code: + // + // if system exists + // if uring enabled + // start arbiter using uring method + // else + // start arbiter with regular tokio + // else + // if uring enabled + // start uring in spawned thread + // else + // start regular tokio in spawned thread // every worker runs in it's own thread and tokio runtime. // use a custom tokio runtime builder to change the settings of runtime. - std::thread::Builder::new() - .name(format!("actix-server worker {}", idx)) - .spawn(move || { - // forward existing actix system context - if let Some(sys) = sys { - System::set_current(sys); - } - let worker_fut = async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); + match (actix_system, tokio_handle) { + (None, None) => { + panic!("No runtime detected. Start a Tokio (or Actix) runtime."); + } - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all_local(fut) - .await - .into_iter() - .collect::, _>>(); + // no actix system + (None, Some(rt_handle)) => { + std::thread::Builder::new() + .name(format!("actix-server worker {}", idx)) + .spawn(move || { + let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services - }) - .into_boxed_slice(), + // local set for running service init futures and worker services + let ls = tokio::task::LocalSet::new(); - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::try_current().as_ref().map(ArbiterHandle::stop); + // init services using existing Tokio runtime (so probably on main thread) + let services = rt_handle.block_on(ls.run_until(async { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + )); + } + } + } + + Ok(services) + })); + + let services = match services { + Ok(services) => { + factory_tx.send(Ok(())).unwrap(); + services + } + Err(err) => { + factory_tx.send(Err(err)).unwrap(); return; } }; - factory_tx.send(()).unwrap(); + let worker_services = wrap_worker_services(services); - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }) - .await - .expect("task 3 panic"); + let worker_fut = async move { + // spawn to make sure ServerWorker runs as non boxed future. + spawn(async move { + ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: WorkerState::default(), + shutdown_timeout: config.shutdown_timeout, + } + .await; + + // wake up outermost task waiting for shutdown + worker_stopped_tx.send(()).unwrap(); + }); + + worker_stopped_rx.await.unwrap(); + }; + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + tokio_uring::start(worker_fut); + } + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(ls.run_until(worker_fut)); + } }) - .await - .expect("task 2 panic"); - }; + .expect("cannot spawn server worker thread"); + } + // with actix system + (Some(_sys), _) => { #[cfg(all(target_os = "linux", feature = "io-uring"))] - { + let arbiter = { // TODO: pass max blocking thread config when tokio-uring enable configuration // on building runtime. let _ = config.max_blocking_threads; - tokio_uring::start(worker_fut) - } + Arbiter::new() + }; #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap(); + let arbiter = { + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + }; - rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut)) - } - }) - .expect("worker thread error/panic"); + arbiter.spawn(async move { + // spawn_local to run !Send future tasks. + spawn(async move { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + Arbiter::current().stop(); + factory_tx + .send(Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + ))) + .unwrap(); + return; + } + } + } + + factory_tx.send(Ok(())).unwrap(); + + let worker_services = wrap_worker_services(services); + + // spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }); + }); + }); + } + }; // wait for service factories initialization - factory_rx.recv().unwrap(); + factory_rx.recv().unwrap()?; - Ok(handle_pair(idx, tx1, tx2, counter)) + Ok(pair) } fn restart_service(&mut self, idx: usize, factory_id: usize) { @@ -419,7 +509,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -430,7 +520,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -438,10 +528,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((idx, srv.factory)); + return Err((idx, srv.factory_idx)); } } } @@ -484,7 +574,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - trace!("stopping ServerWorker Arbiter"); Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } @@ -496,7 +585,8 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) + if let Poll::Ready(Some(Stop { graceful, tx })) = + Pin::new(&mut this.stop_rx).poll_recv(cx) { let num = this.counter.total(); if num == 0 { @@ -559,7 +649,7 @@ impl Future for ServerWorker { } WorkerState::Shutdown(ref mut shutdown) => { // drop all pending connections in rx channel. - while let Poll::Ready(Some(conn)) = Pin::new(&mut this.rx).poll_recv(cx) { + while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) { // WorkerCounterGuard is needed as Accept thread has incremented counter. // It's guard's job to decrement the counter together with drop of Conn. let guard = this.counter.guard(); @@ -606,7 +696,7 @@ impl Future for ServerWorker { } // handle incoming io stream - match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) { Some(msg) => { let guard = this.counter.guard(); let _ = this.services[msg.token].service.call((guard, msg.io)); @@ -617,3 +707,19 @@ impl Future for ServerWorker { } } } + +fn wrap_worker_services( + services: Vec<(usize, usize, BoxedServerService)>, +) -> Vec { + services + .into_iter() + .fold(Vec::new(), |mut services, (idx, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory_idx: idx, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) +} diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 0a8cd2ae..9a14e78a 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -33,28 +33,63 @@ fn test_bind() { })? .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); + + thread::sleep(Duration::from_millis(500)); + assert!(net::TcpStream::connect(addr).is_ok()); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); +} + +#[test] +fn plain_tokio_runtime() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let h = thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let srv = Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); + + tx.send(srv.handle()).unwrap(); + + srv.await + }) + }); + + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = srv.stop(true); - sys.stop(); h.join().unwrap().unwrap(); } #[test] fn test_listen() { let addr = unused_addr(); + let lst = net::TcpListener::bind(addr).unwrap(); + let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let lst = net::TcpListener::bind(addr)?; actix_rt::System::new().block_on(async { let srv = Server::build() .disable_signals() @@ -64,19 +99,18 @@ fn test_listen() { })? .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = srv.stop(true); - sys.stop(); h.join().unwrap().unwrap(); } @@ -283,12 +317,12 @@ async fn test_service_restart() { .workers(1) .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -311,7 +345,6 @@ async fn test_service_restart() { assert!(num2_clone.load(Ordering::SeqCst) > 5); let _ = srv.stop(false); - sys.stop(); h.join().unwrap().unwrap(); } @@ -388,13 +421,13 @@ async fn worker_restart() { .workers(2) .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -452,6 +485,5 @@ async fn worker_restart() { stream.shutdown().await.unwrap(); let _ = srv.stop(false); - sys.stop(); h.join().unwrap().unwrap(); } diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index 6cf3ef08..503cf116 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,7 +1,7 @@ /// An implementation of [`poll_ready`]() that always signals readiness. /// /// This should only be used for basic leaf services that have no concept of un-readiness. -/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke +/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke /// `poll_ready` implementation. /// /// [`poll_ready`]: crate::Service::poll_ready diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index a94706a2..ed858378 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -23,3 +23,4 @@ local-waker = "0.1" [dev-dependencies] actix-rt = "2.0.0" futures-util = { version = "0.3.7", default-features = false } +static_assertions = "1.1" diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs index 4a01ada3..678d6304 100644 --- a/actix-utils/src/future/ready.rs +++ b/actix-utils/src/future/ready.rs @@ -103,10 +103,16 @@ pub fn err(err: E) -> Ready> { #[cfg(test)] mod tests { + use std::rc::Rc; + use futures_util::task::noop_waker; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; + assert_impl_all!(Ready<()>: Send, Sync, Clone); + assert_not_impl_all!(Ready>: Send, Sync); + #[test] #[should_panic] fn multiple_poll_panics() {