diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index a3cb5272..44e34b5d 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,5 +1,7 @@ use std::{ + any::{Any, TypeId}, cell::RefCell, + collections::HashMap, fmt, future::Future, pin::Pin, @@ -20,6 +22,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( static HANDLE: RefCell> = RefCell::new(None); + static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); pub(crate) enum ArbiterCommand { @@ -118,6 +121,7 @@ impl Arbiter { System::set_current(sys); + STORAGE.with(|cell| cell.borrow_mut().clear()); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); // register arbiter @@ -152,6 +156,7 @@ impl Arbiter { let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); local.spawn_local(ArbiterRunner { rx }); @@ -209,6 +214,54 @@ impl Arbiter { pub fn join(self) -> thread::Result<()> { self.thread_handle.join() } + + /// Insert item into Arbiter's thread-local storage. + /// + /// Overwrites any item of the same type previously inserted. + pub fn set_item(item: T) { + STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); + } + + /// Check if Arbiter's thread-local storage contains an item type. + pub fn contains_item() -> bool { + STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) + } + + /// Call a function with a shared reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + pub fn get_item(mut f: F) -> R + where + F: FnMut(&T) -> R, + { + STORAGE.with(move |cell| { + let st = cell.borrow(); + + let type_id = TypeId::of::(); + let item = st.get(&type_id).and_then(downcast_ref).unwrap(); + + f(item) + }) + } + + /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + pub fn get_mut_item(mut f: F) -> R + where + F: FnMut(&mut T) -> R, + { + STORAGE.with(move |cell| { + let mut st = cell.borrow_mut(); + + let type_id = TypeId::of::(); + let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); + + f(item) + }) + } } /// A persistent future that processes [Arbiter] commands. @@ -239,3 +292,11 @@ impl Future for ArbiterRunner { } } } + +fn downcast_ref(boxed: &Box) -> Option<&T> { + boxed.downcast_ref() +} + +fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { + boxed.downcast_mut() +} diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index f54e9909..b2607a91 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -140,6 +140,35 @@ fn arbiter_drop_no_panic_fut() { arbiter.join().unwrap(); } +#[test] +fn arbiter_item_storage() { + let _ = System::new(); + + let arbiter = Arbiter::new(); + + assert!(!Arbiter::contains_item::()); + Arbiter::set_item(42u32); + assert!(Arbiter::contains_item::()); + + Arbiter::get_item(|&item: &u32| assert_eq!(item, 42)); + Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); + + let thread = thread::spawn(move || { + Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); + }) + .join(); + assert!(thread.is_err()); + + let thread = thread::spawn(move || { + Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); + }) + .join(); + assert!(thread.is_err()); + + arbiter.stop(); + arbiter.join().unwrap(); +} + #[test] #[should_panic] fn no_system_current_panic() {