1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-23 21:51:06 +01:00

prepare actix-rt v2.0.0 release (#262)

This commit is contained in:
Rob Ede 2021-02-03 10:25:31 +00:00 committed by GitHub
parent 66bd5bf4a2
commit 4ec358575e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 172 additions and 121 deletions

View File

@ -28,6 +28,7 @@ enum PatternElement {
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
enum PatternType {
Static(String),
Prefix(String),

View File

@ -3,6 +3,13 @@
## Unreleased - 2021-xx-xx
## 2.0.0 - 2021-02-02
* Remove all Arbiter-local storage methods. [#262]
* Re-export `tokio::pin`. [#262]
[#262]: https://github.com/actix/actix-net/pull/262
## 2.0.0-beta.3 - 2021-01-31
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253]

View File

@ -1,12 +1,12 @@
[package]
name = "actix-rt"
version = "2.0.0-beta.3"
version = "2.0.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt"
@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }

View File

@ -0,0 +1,28 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World")))
}
fn main() {
actix_rt::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
})
.block_on(async {
let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server =
Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
})
}

View File

@ -1,7 +1,5 @@
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
fmt,
future::Future,
pin::Pin,
@ -14,7 +12,7 @@ use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
runtime::Runtime,
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
@ -22,7 +20,6 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) enum ArbiterCommand {
@ -97,16 +94,30 @@ pub struct Arbiter {
}
impl Arbiter {
/// Spawn new Arbiter thread and start its event loop.
/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
}
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
@ -116,18 +127,17 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
STORAGE.with(|cell| cell.borrow_mut().clear());
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(id, hnd));
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
@ -137,7 +147,7 @@ impl Arbiter {
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
@ -156,7 +166,6 @@ impl Arbiter {
let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(ArbiterRunner { rx });
@ -214,58 +223,6 @@ impl Arbiter {
pub fn join(self) -> thread::Result<()> {
self.thread_handle.join()
}
/// Insert item into Arbiter's thread-local storage.
///
/// Overwrites any item of the same type previously inserted.
#[deprecated = "Will be removed in stable v2."]
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if Arbiter's thread-local storage contains an item type.
#[deprecated = "Will be removed in stable v2."]
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
}
/// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let type_id = TypeId::of::<T>();
let item = st.get(&type_id).and_then(downcast_ref).unwrap();
f(item)
})
}
/// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let type_id = TypeId::of::<T>();
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();
f(item)
})
}
}
/// A persistent future that processes [Arbiter] commands.
@ -296,11 +253,3 @@ impl Future for ArbiterRunner {
}
}
}
fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
boxed.downcast_ref()
}
fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
boxed.downcast_mut()
}

View File

@ -12,7 +12,7 @@
//!
//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
//! blocking thread-pool using [`task::spawn_blocking`].
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
@ -56,6 +56,8 @@ pub use self::arbiter::{Arbiter, ArbiterHandle};
pub use self::runtime::Runtime;
pub use self::system::{System, SystemRunner};
pub use tokio::pin;
pub mod signal {
//! Asynchronous signal handling (Tokio re-exports).

View File

@ -2,7 +2,7 @@ use std::{future::Future, io};
use tokio::task::{JoinHandle, LocalSet};
/// A single-threaded runtime based on Tokio's "current thread" runtime.
/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
@ -12,14 +12,18 @@ pub struct Runtime {
rt: tokio::runtime::Runtime,
}
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
}
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
pub fn new() -> io::Result<Self> {
let rt = default_tokio_runtime()?;
Ok(Runtime {
rt,
@ -81,3 +85,12 @@ impl Runtime {
self.local.block_on(&self.rt, f)
}
}
impl From<tokio::runtime::Runtime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt,
}
}
}

View File

@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -36,10 +36,24 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());

View File

@ -1,5 +1,9 @@
use std::{
sync::mpsc::channel,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread,
time::{Duration, Instant},
};
@ -140,36 +144,6 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[test]
#[allow(deprecated)]
fn arbiter_item_storage() {
let _ = System::new();
let arbiter = Arbiter::new();
assert!(!Arbiter::contains_item::<u32>());
Arbiter::set_item(42u32);
assert!(Arbiter::contains_item::<u32>());
Arbiter::get_item(|&item: &u32| assert_eq!(item, 42));
Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
let thread = thread::spawn(move || {
Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
})
.join();
assert!(thread.is_err());
let thread = thread::spawn(move || {
Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
})
.join();
assert!(thread.is_err());
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
@ -224,9 +198,71 @@ fn system_stop_stops_arbiters() {
System::current().stop();
sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows)
thread::sleep(Duration::from_millis(100));
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
arb.join().unwrap();
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
});
assert!(did_spawn);
arb.join().unwrap();
assert_eq!(false, counter.load(Ordering::SeqCst));
}

View File

@ -24,7 +24,7 @@ default = []
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.3", default-features = false }
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.3"
actix-utils = "3.0.0-beta.1"

View File

@ -24,5 +24,5 @@ futures-core = { version = "0.3.7", default-features = false }
pin-project-lite = "0.2"
[dev-dependencies]
actix-rt = "2.0.0-beta.3"
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }

View File

@ -41,7 +41,7 @@ uri = ["http"]
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.3", default-features = false }
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.3"
actix-utils = "3.0.0-beta.1"
@ -67,7 +67,7 @@ tls-native-tls = { package = "native-tls", version = "0.2", optional = true }
tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies]
actix-rt = "2.0.0-beta.3"
actix-rt = "2.0.0"
actix-server = "2.0.0-beta.2"
bytes = "1"
env_logger = "0.8"

View File

@ -23,5 +23,5 @@ tracing = "0.1"
tracing-futures = "0.2"
[dev_dependencies]
actix-rt = "2.0.0-beta.3"
actix-rt = "2.0.0"
slab = "0.4"

View File

@ -17,7 +17,7 @@ path = "src/lib.rs"
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.3", default-features = false }
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.3"
futures-core = { version = "0.3.7", default-features = false }