From 6fed1c3e7dfdda27b8809d6a1fd1e83d260cbf39 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 11 Oct 2021 09:58:11 +0800 Subject: [PATCH] add support for io-uring (#374) Co-authored-by: Rob Ede --- .cargo/config.toml | 20 ++++++++++- .github/workflows/ci.yml | 58 +++++++++++++++++------------- actix-rt/CHANGES.md | 2 ++ actix-rt/Cargo.toml | 4 +++ actix-rt/src/arbiter.rs | 72 ++++++++++++++++++++++++++++++++------ actix-rt/src/lib.rs | 7 ++++ actix-rt/src/runtime.rs | 5 --- actix-rt/src/system.rs | 2 +- actix-rt/tests/tests.rs | 46 ++++++++++++++++++++---- actix-server/CHANGES.md | 2 ++ actix-server/Cargo.toml | 1 + actix-server/src/worker.rs | 16 +++++++-- actix-tls/src/lib.rs | 1 + 13 files changed, 185 insertions(+), 51 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 16d75ced..0e5de486 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,23 @@ [alias] chk = "check --workspace --all-features --tests --examples --bins" lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" -ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture" + ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" + +# just check the library (without dev deps) +ci-check-min = "hack --workspace check --no-default-features" +ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check" +ci-check-lib-linux = "hack --workspace --feature-powerset check" + +# check everything +ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples" +ci-check-linux = "hack --workspace --feature-powerset check --tests --examples" + +# tests avoiding io-uring feature +ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture" +ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" +ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" + +# test with io-uring feature +ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" +ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0f62910..45841fb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,36 +75,47 @@ jobs: command: install args: cargo-hack - - name: check minimal + - name: check lib + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: hack - args: check --workspace --no-default-features - - - name: check minimal + tests + with: { command: ci-check-lib } + - name: check lib + if: matrix.target.os == 'ubuntu-latest' uses: actions-rs/cargo@v1 - with: - command: hack - args: check --workspace --no-default-features --tests --examples - - - name: check default + with: { command: ci-check-lib-linux } + - name: check lib + if: matrix.target.triple == 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --tests --examples - + with: { command: ci-check-min } + - name: check full # TODO: compile OpenSSL and run tests on MinGW - if: matrix.target.triple != 'x86_64-pc-windows-gnu' + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --all-features --tests --examples + with: { command: ci-check } + - name: check all + if: matrix.target.os == 'ubuntu-latest' + uses: actions-rs/cargo@v1 + with: { command: ci-check-linux } - name: tests - if: matrix.target.triple != 'x86_64-pc-windows-gnu' - uses: actions-rs/cargo@v1 - with: { command: ci-test } + if: > + matrix.target.os != 'ubuntu-latest' + && matrix.target.triple != 'x86_64-pc-windows-gnu' + run: | + cargo ci-test + cargo ci-test-rt + cargo ci-test-server + - name: tests + if: matrix.target.os == 'ubuntu-latest' + run: | + cargo ci-test + cargo ci-test-rt-linux + cargo ci-test-server-linux - name: Generate coverage file if: > @@ -120,8 +131,7 @@ jobs: && matrix.version == 'stable' && github.ref == 'refs/heads/master' uses: codecov/codecov-action@v1 - with: - file: cobertura.xml + with: { file: cobertura.xml } - name: Clear the cargo caches run: | diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 42879e12..373640d3 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,9 +1,11 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `io-uring` feature for enabling async file I/O on linux. [#374] * The `spawn` method can now resolve with non-unit outputs. [#369] [#369]: https://github.com/actix/actix-net/pull/369 +[#374]: https://github.com/actix/actix-net/pull/374 ## 2.2.0 - 2021-03-29 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f4a90d2c..b466bb76 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,6 +21,7 @@ path = "src/lib.rs" [features] default = ["macros"] macros = ["actix-macros"] +io-uring = ["tokio-uring"] [dependencies] actix-macros = { version = "0.2.0", optional = true } @@ -28,6 +29,9 @@ actix-macros = { version = "0.2.0", optional = true } futures-core = { version = "0.3", default-features = false } tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } +[target.'cfg(target_os = "linux")'.dependencies] +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] tokio = { version = "1.2", features = ["full"] } hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 9ff1419d..97084f05 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,12 +9,9 @@ use std::{ }; use futures_core::ready; -use tokio::{sync::mpsc, task::LocalSet}; +use tokio::sync::mpsc; -use crate::{ - runtime::{default_tokio_runtime, Runtime}, - system::{System, SystemCommand}, -}; +use crate::system::{System, SystemCommand}; pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); @@ -98,16 +95,19 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { - default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") + crate::runtime::default_tokio_runtime() + .expect("Cannot create new Arbiter's Runtime.") }) } /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[doc(hidden)] pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where @@ -127,7 +127,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::from(runtime_factory()); + let rt = crate::runtime::Runtime::from(runtime_factory()); let hnd = ArbiterHandle::new(tx); System::set_current(sys); @@ -159,15 +159,67 @@ impl Arbiter { Arbiter { tx, thread_handle } } - /// Sets up an Arbiter runner in a new System using the provided runtime local task set. - pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { + /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + let sys = System::current(); + let system_id = sys.id(); + let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); + + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let (tx, rx) = mpsc::unbounded_channel(); + + let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let hnd = ArbiterHandle::new(tx); + + System::set_current(sys); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register arbiter + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(arb_id, hnd)); + + ready_tx.send(()).unwrap(); + + // run arbiter event processing loop + tokio_uring::start(ArbiterRunner { rx }); + + // deregister arbiter + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(arb_id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) + }); + + ready_rx.recv().unwrap(); + + Arbiter { tx, thread_handle } + } + + /// Sets up an Arbiter runner in a new System using the environment's local set. + pub(crate) fn in_new_system() -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - local.spawn_local(ArbiterRunner { rx }); + crate::spawn(ArbiterRunner { rx }); hnd } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 95afcac9..e078dd06 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -32,6 +32,10 @@ //! arbiter.stop(); //! arbiter.join().unwrap(); //! ``` +//! +//! # `io-uring` Support +//! There is experimental support for using io-uring with this crate by enabling the +//! `io-uring` feature. For now, it is semver exempt. #![deny(rust_2018_idioms, nonstandard_style)] #![allow(clippy::type_complexity)] @@ -39,6 +43,9 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +#[cfg(all(not(target_os = "linux"), feature = "io-uring"))] +compile_error!("io_uring is a linux only feature."); + use std::future::Future; use tokio::task::JoinHandle; diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 1adbf6c0..25937003 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -31,11 +31,6 @@ impl Runtime { }) } - /// Reference to local task set. - pub(crate) fn local_set(&self) -> &LocalSet { - &self.local - } - /// Offload a future onto the single-threaded runtime. /// /// The returned join handle can be used to await the future's result. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3bc8a6e3..4f262ede 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -54,7 +54,7 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::from(runtime_factory()); - let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); system diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index e66696bf..5fe1e894 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,10 +1,6 @@ use std::{ future::Future, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::channel, - Arc, - }, + sync::mpsc::channel, thread, time::{Duration, Instant}, }; @@ -221,8 +217,8 @@ fn system_stop_stops_arbiters() { System::current().stop(); sys.run().unwrap(); - // account for slightly slow thread de-spawns (only observed on windows) - thread::sleep(Duration::from_millis(100)); + // account for slightly slow thread de-spawns + thread::sleep(Duration::from_millis(500)); // arbiter should be dead and return false assert!(!Arbiter::current().spawn_fn(|| {})); @@ -231,6 +227,7 @@ fn system_stop_stops_arbiters() { arb.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_system_with_tokio() { let (tx, rx) = channel(); @@ -263,8 +260,14 @@ fn new_system_with_tokio() { assert_eq!(rx.recv().unwrap(), 42); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_arbiter_with_tokio() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + let _ = System::new(); let arb = Arbiter::with_tokio_rt(|| { @@ -323,3 +326,32 @@ fn spawn_local() { h(actix_rt::spawn(async { 1 })); }) } + +#[cfg(all(target_os = "linux", feature = "io-uring"))] +#[test] +fn tokio_uring_arbiter() { + let system = System::new(); + let (tx, rx) = std::sync::mpsc::channel(); + + Arbiter::new().spawn(async move { + let handle = actix_rt::spawn(async move { + let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); + let buf = b"Hello World!"; + + let (res, _) = f.write_at(&buf[..], 0).await; + assert!(res.is_ok()); + + f.sync_all().await.unwrap(); + f.close().await.unwrap(); + + std::fs::remove_file("test.txt").unwrap(); + }); + + handle.await.unwrap(); + tx.send(true).unwrap(); + }); + + assert!(rx.recv().unwrap()); + + drop(system); +} diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 86cde4f0..69f5b08c 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,11 +3,13 @@ ## Unreleased - 2021-xx-xx * Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] * Remove `ServerBuilder::configure` [#349] +* Add `io-uring` feature for enabling async file I/O on linux. [#374] * Server no long listens to SIGHUP signal. It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop the Server from receiving any future signal, because the `Signals` future stops on the first signal received [#389] +[#374]: https://github.com/actix/actix-net/pull/374 [#349]: https://github.com/actix/actix-net/pull/349 [#389]: https://github.com/actix/actix-net/pull/389 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 58471cf9..89e1d4e2 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,6 +18,7 @@ path = "src/lib.rs" [features] default = [] +io-uring = ["actix-rt/io-uring"] [dependencies] actix-rt = { version = "2.0.0", default-features = false } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a974522a..21f98027 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -280,14 +280,24 @@ impl ServerWorker { let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { + #[cfg(all(target_os = "linux", feature = "io-uring"))] + let arbiter = { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + Arbiter::new() + }; + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + let arbiter = Arbiter::with_tokio_rt(move || { tokio::runtime::Builder::new_current_thread() .enable_all() .max_blocking_threads(config.max_blocking_threads) .build() .unwrap() - }) - .spawn(async move { + }); + + arbiter.spawn(async move { let fut = factories .iter() .enumerate() diff --git a/actix-tls/src/lib.rs b/actix-tls/src/lib.rs index 83e18d58..dbda8834 100644 --- a/actix-tls/src/lib.rs +++ b/actix-tls/src/lib.rs @@ -5,6 +5,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #[cfg(feature = "openssl")] +#[allow(unused_extern_crates)] extern crate tls_openssl as openssl; #[cfg(feature = "accept")]