From 4ec358575ed9b1a9636c8b83d349f6a5eeaddaa2 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Feb 2021 10:25:31 +0000 Subject: [PATCH] prepare actix-rt v2.0.0 release (#262) --- actix-router/src/resource.rs | 1 + actix-rt/CHANGES.md | 7 +++ actix-rt/Cargo.toml | 5 +- actix-rt/examples/hyper.rs | 28 +++++++++++ actix-rt/src/arbiter.rs | 95 ++++++++-------------------------- actix-rt/src/lib.rs | 4 +- actix-rt/src/runtime.rs | 25 ++++++--- actix-rt/src/system.rs | 18 ++++++- actix-rt/tests/tests.rs | 98 ++++++++++++++++++++++++------------ actix-server/Cargo.toml | 2 +- actix-service/Cargo.toml | 2 +- actix-tls/Cargo.toml | 4 +- actix-tracing/Cargo.toml | 2 +- actix-utils/Cargo.toml | 2 +- 14 files changed, 172 insertions(+), 121 deletions(-) create mode 100644 actix-rt/examples/hyper.rs diff --git a/actix-router/src/resource.rs b/actix-router/src/resource.rs index 3808bc25..8dbef26c 100644 --- a/actix-router/src/resource.rs +++ b/actix-router/src/resource.rs @@ -28,6 +28,7 @@ enum PatternElement { } #[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] enum PatternType { Static(String), Prefix(String), diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index d3b74137..15052613 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -3,6 +3,13 @@ ## Unreleased - 2021-xx-xx +## 2.0.0 - 2021-02-02 +* Remove all Arbiter-local storage methods. [#262] +* Re-export `tokio::pin`. [#262] + +[#262]: https://github.com/actix/actix-net/pull/262 + + ## 2.0.0-beta.3 - 2021-01-31 * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253] diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 79513580..f8f3984d 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "actix-rt" -version = "2.0.0-beta.3" +version = "2.0.0" authors = [ "Nikolay Kim ", "Rob Ede ", ] description = "Tokio-based single-threaded async runtime for the Actix ecosystem" -keywords = ["network", "framework", "async", "futures"] +keywords = ["async", "futures", "io", "runtime"] homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/actix-rt" @@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync [dev-dependencies] tokio = { version = "1", features = ["full"] } +hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } diff --git a/actix-rt/examples/hyper.rs b/actix-rt/examples/hyper.rs new file mode 100644 index 00000000..8bad1b33 --- /dev/null +++ b/actix-rt/examples/hyper.rs @@ -0,0 +1,28 @@ +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; +use std::convert::Infallible; +use std::net::SocketAddr; + +async fn handle(_req: Request) -> Result, Infallible> { + Ok(Response::new(Body::from("Hello World"))) +} + +fn main() { + actix_rt::System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + }) + .block_on(async { + let make_service = + make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); + + let server = + Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } + }) +} diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 72e2d3e3..7eae662a 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,7 +1,5 @@ use std::{ - any::{Any, TypeId}, cell::RefCell, - collections::HashMap, fmt, future::Future, pin::Pin, @@ -14,7 +12,7 @@ use futures_core::ready; use tokio::{sync::mpsc, task::LocalSet}; use crate::{ - runtime::Runtime, + runtime::{default_tokio_runtime, Runtime}, system::{System, SystemCommand}, }; @@ -22,7 +20,6 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( static HANDLE: RefCell> = RefCell::new(None); - static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); pub(crate) enum ArbiterCommand { @@ -97,16 +94,30 @@ pub struct Arbiter { } impl Arbiter { - /// Spawn new Arbiter thread and start its event loop. + /// Spawn a new Arbiter thread and start its event loop. /// /// # Panics /// Panics if a [System] is not registered on the current thread. #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { - let id = COUNT.fetch_add(1, Ordering::Relaxed); - let system_id = System::current().id(); - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id); + Self::with_tokio_rt(|| { + default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") + }) + } + + /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + where + F: Fn() -> tokio::runtime::Runtime + Send + 'static, + { let sys = System::current(); + let system_id = sys.id(); + let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); + + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); let (tx, rx) = mpsc::unbounded_channel(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); @@ -116,18 +127,17 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime."); + let rt = Runtime::from(runtime_factory()); let hnd = ArbiterHandle::new(tx); System::set_current(sys); - STORAGE.with(|cell| cell.borrow_mut().clear()); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); // register arbiter let _ = System::current() .tx() - .send(SystemCommand::RegisterArbiter(id, hnd)); + .send(SystemCommand::RegisterArbiter(arb_id, hnd)); ready_tx.send(()).unwrap(); @@ -137,7 +147,7 @@ impl Arbiter { // deregister arbiter let _ = System::current() .tx() - .send(SystemCommand::DeregisterArbiter(id)); + .send(SystemCommand::DeregisterArbiter(arb_id)); } }) .unwrap_or_else(|err| { @@ -156,7 +166,6 @@ impl Arbiter { let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); local.spawn_local(ArbiterRunner { rx }); @@ -214,58 +223,6 @@ impl Arbiter { pub fn join(self) -> thread::Result<()> { self.thread_handle.join() } - - /// Insert item into Arbiter's thread-local storage. - /// - /// Overwrites any item of the same type previously inserted. - #[deprecated = "Will be removed in stable v2."] - pub fn set_item(item: T) { - STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); - } - - /// Check if Arbiter's thread-local storage contains an item type. - #[deprecated = "Will be removed in stable v2."] - pub fn contains_item() -> bool { - STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) - } - - /// Call a function with a shared reference to an item in this Arbiter's thread-local storage. - /// - /// # Panics - /// Panics if item is not in Arbiter's thread-local item storage. - #[deprecated = "Will be removed in stable v2."] - pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, - { - STORAGE.with(move |cell| { - let st = cell.borrow(); - - let type_id = TypeId::of::(); - let item = st.get(&type_id).and_then(downcast_ref).unwrap(); - - f(item) - }) - } - - /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage. - /// - /// # Panics - /// Panics if item is not in Arbiter's thread-local item storage. - #[deprecated = "Will be removed in stable v2."] - pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, - { - STORAGE.with(move |cell| { - let mut st = cell.borrow_mut(); - - let type_id = TypeId::of::(); - let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); - - f(item) - }) - } } /// A persistent future that processes [Arbiter] commands. @@ -296,11 +253,3 @@ impl Future for ArbiterRunner { } } } - -fn downcast_ref(boxed: &Box) -> Option<&T> { - boxed.downcast_ref() -} - -fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { - boxed.downcast_mut() -} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 9c846203..831958aa 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -12,7 +12,7 @@ //! //! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise //! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the -//! blocking thread-pool using [`task::spawn_blocking`]. +//! blocking task thread-pool using [`task::spawn_blocking`]. //! //! # Examples //! ``` @@ -56,6 +56,8 @@ pub use self::arbiter::{Arbiter, ArbiterHandle}; pub use self::runtime::Runtime; pub use self::system::{System, SystemRunner}; +pub use tokio::pin; + pub mod signal { //! Asynchronous signal handling (Tokio re-exports). diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index a20dfe7e..1adbf6c0 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,7 +2,7 @@ use std::{future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; -/// A single-threaded runtime based on Tokio's "current thread" runtime. +/// A Tokio-based runtime proxy. /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// on submitted futures. @@ -12,14 +12,18 @@ pub struct Runtime { rt: tokio::runtime::Runtime, } +pub(crate) fn default_tokio_runtime() -> io::Result { + tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() +} + impl Runtime { /// Returns a new runtime initialized with default configuration values. #[allow(clippy::new_ret_no_self)] - pub fn new() -> io::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build()?; + pub fn new() -> io::Result { + let rt = default_tokio_runtime()?; Ok(Runtime { rt, @@ -81,3 +85,12 @@ impl Runtime { self.local.block_on(&self.rt, f) } } + +impl From for Runtime { + fn from(rt: tokio::runtime::Runtime) -> Self { + Self { + local: LocalSet::new(), + rt, + } + } +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 58fe3cab..b7f134cb 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{arbiter::ArbiterHandle, Arbiter, Runtime}; +use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -36,10 +36,24 @@ impl System { /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { + Self::with_tokio_rt(|| { + default_tokio_runtime() + .expect("Default Actix (Tokio) runtime could not be created.") + }) + } + + /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner + where + F: Fn() -> tokio::runtime::Runtime, + { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); + let rt = Runtime::from(runtime_factory()); let sys_arbiter = Arbiter::in_new_system(rt.local_set()); let system = System::construct(sys_tx, sys_arbiter.clone()); diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 56ac2017..56b5e8a6 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,5 +1,9 @@ use std::{ - sync::mpsc::channel, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + Arc, + }, thread, time::{Duration, Instant}, }; @@ -140,36 +144,6 @@ fn arbiter_drop_no_panic_fut() { arbiter.join().unwrap(); } -#[test] -#[allow(deprecated)] -fn arbiter_item_storage() { - let _ = System::new(); - - let arbiter = Arbiter::new(); - - assert!(!Arbiter::contains_item::()); - Arbiter::set_item(42u32); - assert!(Arbiter::contains_item::()); - - Arbiter::get_item(|&item: &u32| assert_eq!(item, 42)); - Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); - - let thread = thread::spawn(move || { - Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); - }) - .join(); - assert!(thread.is_err()); - - let thread = thread::spawn(move || { - Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); - }) - .join(); - assert!(thread.is_err()); - - arbiter.stop(); - arbiter.join().unwrap(); -} - #[test] #[should_panic] fn no_system_current_panic() { @@ -224,9 +198,71 @@ fn system_stop_stops_arbiters() { System::current().stop(); sys.run().unwrap(); + // account for slightly slow thread de-spawns (only observed on windows) + thread::sleep(Duration::from_millis(100)); + // arbiter should be dead and return false assert!(!Arbiter::current().spawn_fn(|| {})); assert!(!arb.spawn_fn(|| {})); arb.join().unwrap(); } + +#[test] +fn new_system_with_tokio() { + let (tx, rx) = channel(); + + let res = System::with_tokio_rt(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_keep_alive(Duration::from_millis(1000)) + .worker_threads(2) + .max_blocking_threads(2) + .on_thread_start(|| {}) + .on_thread_stop(|| {}) + .build() + .unwrap() + }) + .block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + + tokio::task::spawn(async move { + tx.send(42).unwrap(); + }) + .await + .unwrap(); + + 123usize + }); + + assert_eq!(res, 123); + assert_eq!(rx.recv().unwrap(), 42); +} + +#[test] +fn new_arbiter_with_tokio() { + let _ = System::new(); + + let arb = Arbiter::with_tokio_rt(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + }); + + let counter = Arc::new(AtomicBool::new(true)); + + let counter1 = counter.clone(); + let did_spawn = arb.spawn(async move { + actix_rt::time::sleep(Duration::from_millis(1)).await; + counter1.store(false, Ordering::SeqCst); + Arbiter::current().stop(); + }); + + assert!(did_spawn); + + arb.join().unwrap(); + + assert_eq!(false, counter.load(Ordering::SeqCst)); +} diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index a5f112b4..845dc03e 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -24,7 +24,7 @@ default = [] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = { version = "2.0.0-beta.3", default-features = false } +actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index f83939f2..ce18ba66 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -24,5 +24,5 @@ futures-core = { version = "0.3.7", default-features = false } pin-project-lite = "0.2" [dev-dependencies] -actix-rt = "2.0.0-beta.3" +actix-rt = "2.0.0" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 8c13a2fd..acdd0419 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -41,7 +41,7 @@ uri = ["http"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = { version = "2.0.0-beta.3", default-features = false } +actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" @@ -67,7 +67,7 @@ tls-native-tls = { package = "native-tls", version = "0.2", optional = true } tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] -actix-rt = "2.0.0-beta.3" +actix-rt = "2.0.0" actix-server = "2.0.0-beta.2" bytes = "1" env_logger = "0.8" diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index 24888da3..23d785cb 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -23,5 +23,5 @@ tracing = "0.1" tracing-futures = "0.2" [dev_dependencies] -actix-rt = "2.0.0-beta.3" +actix-rt = "2.0.0" slab = "0.4" diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 468cb8d7..8b19937f 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = { version = "2.0.0-beta.3", default-features = false } +actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.3" futures-core = { version = "0.3.7", default-features = false }