1
0
mirror of https://github.com/fafhrd91/actix-net synced 2024-11-27 19:12:56 +01:00

add support for io-uring (#374)

Co-authored-by: Rob Ede <robjtede@icloud.com>
This commit is contained in:
fakeshadow 2021-10-11 09:58:11 +08:00 committed by GitHub
parent c3d697df97
commit 6fed1c3e7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 185 additions and 51 deletions

View File

@ -1,5 +1,23 @@
[alias] [alias]
chk = "check --workspace --all-features --tests --examples --bins" chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"
# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"
# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"

View File

@ -75,36 +75,47 @@ jobs:
command: install command: install
args: cargo-hack args: cargo-hack
- name: check minimal - name: check lib
if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-lib }
command: hack - name: check lib
args: check --workspace --no-default-features if: matrix.target.os == 'ubuntu-latest'
- name: check minimal + tests
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-lib-linux }
command: hack - name: check lib
args: check --workspace --no-default-features --tests --examples if: matrix.target.triple == 'x86_64-pc-windows-gnu'
- name: check default
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-min }
command: check
args: --workspace --tests --examples
- name: check full - name: check full
# TODO: compile OpenSSL and run tests on MinGW # TODO: compile OpenSSL and run tests on MinGW
if: matrix.target.triple != 'x86_64-pc-windows-gnu' if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check }
command: check - name: check all
args: --workspace --all-features --tests --examples if: matrix.target.os == 'ubuntu-latest'
uses: actions-rs/cargo@v1
with: { command: ci-check-linux }
- name: tests - name: tests
if: matrix.target.triple != 'x86_64-pc-windows-gnu' if: >
uses: actions-rs/cargo@v1 matrix.target.os != 'ubuntu-latest'
with: { command: ci-test } && matrix.target.triple != 'x86_64-pc-windows-gnu'
run: |
cargo ci-test
cargo ci-test-rt
cargo ci-test-server
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file - name: Generate coverage file
if: > if: >
@ -120,8 +131,7 @@ jobs:
&& matrix.version == 'stable' && matrix.version == 'stable'
&& github.ref == 'refs/heads/master' && github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1 uses: codecov/codecov-action@v1
with: with: { file: cobertura.xml }
file: cobertura.xml
- name: Clear the cargo caches - name: Clear the cargo caches
run: | run: |

View File

@ -1,9 +1,11 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
* The `spawn` method can now resolve with non-unit outputs. [#369] * The `spawn` method can now resolve with non-unit outputs. [#369]
[#369]: https://github.com/actix/actix-net/pull/369 [#369]: https://github.com/actix/actix-net/pull/369
[#374]: https://github.com/actix/actix-net/pull/374
## 2.2.0 - 2021-03-29 ## 2.2.0 - 2021-03-29

View File

@ -21,6 +21,7 @@ path = "src/lib.rs"
[features] [features]
default = ["macros"] default = ["macros"]
macros = ["actix-macros"] macros = ["actix-macros"]
io-uring = ["tokio-uring"]
[dependencies] [dependencies]
actix-macros = { version = "0.2.0", optional = true } actix-macros = { version = "0.2.0", optional = true }
@ -28,6 +29,9 @@ actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.2", features = ["full"] } tokio = { version = "1.2", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }

View File

@ -9,12 +9,9 @@ use std::{
}; };
use futures_core::ready; use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet}; use tokio::sync::mpsc;
use crate::{ use crate::system::{System, SystemCommand};
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
@ -98,16 +95,19 @@ impl Arbiter {
/// ///
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Arbiter { pub fn new() -> Arbiter {
Self::with_tokio_rt(|| { Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") crate::runtime::default_tokio_runtime()
.expect("Cannot create new Arbiter's Runtime.")
}) })
} }
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[doc(hidden)] #[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where where
@ -127,7 +127,7 @@ impl Arbiter {
.spawn({ .spawn({
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let rt = Runtime::from(runtime_factory()); let rt = crate::runtime::Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::new(tx);
System::set_current(sys); System::set_current(sys);
@ -159,15 +159,67 @@ impl Arbiter {
Arbiter { tx, thread_handle } Arbiter { tx, thread_handle }
} }
/// Sets up an Arbiter runner in a new System using the provided runtime local task set. /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { ///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn({
let tx = tx.clone();
move || {
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
// run arbiter event processing loop
tokio_uring::start(ArbiterRunner { rx });
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
});
ready_rx.recv().unwrap();
Arbiter { tx, thread_handle }
}
/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
local.spawn_local(ArbiterRunner { rx }); crate::spawn(ArbiterRunner { rx });
hnd hnd
} }

View File

@ -32,6 +32,10 @@
//! arbiter.stop(); //! arbiter.stop();
//! arbiter.join().unwrap(); //! arbiter.join().unwrap();
//! ``` //! ```
//!
//! # `io-uring` Support
//! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
@ -39,6 +43,9 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
compile_error!("io_uring is a linux only feature.");
use std::future::Future; use std::future::Future;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;

View File

@ -31,11 +31,6 @@ impl Runtime {
}) })
} }
/// Reference to local task set.
pub(crate) fn local_set(&self) -> &LocalSet {
&self.local
}
/// Offload a future onto the single-threaded runtime. /// Offload a future onto the single-threaded runtime.
/// ///
/// The returned join handle can be used to await the future's result. /// The returned join handle can be used to await the future's result.

View File

@ -54,7 +54,7 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory()); let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set()); let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone()); let system = System::construct(sys_tx, sys_arbiter.clone());
system system

View File

@ -1,10 +1,6 @@
use std::{ use std::{
future::Future, future::Future,
sync::{ sync::mpsc::channel,
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -221,8 +217,8 @@ fn system_stop_stops_arbiters() {
System::current().stop(); System::current().stop();
sys.run().unwrap(); sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows) // account for slightly slow thread de-spawns
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(500));
// arbiter should be dead and return false // arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {})); assert!(!Arbiter::current().spawn_fn(|| {}));
@ -231,6 +227,7 @@ fn system_stop_stops_arbiters() {
arb.join().unwrap(); arb.join().unwrap();
} }
#[cfg(not(feature = "io-uring"))]
#[test] #[test]
fn new_system_with_tokio() { fn new_system_with_tokio() {
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -263,8 +260,14 @@ fn new_system_with_tokio() {
assert_eq!(rx.recv().unwrap(), 42); assert_eq!(rx.recv().unwrap(), 42);
} }
#[cfg(not(feature = "io-uring"))]
#[test] #[test]
fn new_arbiter_with_tokio() { fn new_arbiter_with_tokio() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
let _ = System::new(); let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| { let arb = Arbiter::with_tokio_rt(|| {
@ -323,3 +326,32 @@ fn spawn_local() {
h(actix_rt::spawn(async { 1 })); h(actix_rt::spawn(async { 1 }));
}) })
} }
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
f.sync_all().await.unwrap();
f.close().await.unwrap();
std::fs::remove_file("test.txt").unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
assert!(rx.recv().unwrap());
drop(system);
}

View File

@ -3,11 +3,13 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] * Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Remove `ServerBuilder::configure` [#349] * Remove `ServerBuilder::configure` [#349]
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to SIGHUP signal. * Server no long listens to SIGHUP signal.
It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop
the Server from receiving any future signal, because the `Signals` future stops on the first the Server from receiving any future signal, because the `Signals` future stops on the first
signal received [#389] signal received [#389]
[#374]: https://github.com/actix/actix-net/pull/374
[#349]: https://github.com/actix/actix-net/pull/349 [#349]: https://github.com/actix/actix-net/pull/349
[#389]: https://github.com/actix/actix-net/pull/389 [#389]: https://github.com/actix/actix-net/pull/389

View File

@ -18,6 +18,7 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
io-uring = ["actix-rt/io-uring"]
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }

View File

@ -280,14 +280,24 @@ impl ServerWorker {
let counter_clone = counter.clone(); let counter_clone = counter.clone();
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime. // use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || { #[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build() .build()
.unwrap() .unwrap()
}) });
.spawn(async move {
arbiter.spawn(async move {
let fut = factories let fut = factories
.iter() .iter()
.enumerate() .enumerate()

View File

@ -5,6 +5,7 @@
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
#[allow(unused_extern_crates)]
extern crate tls_openssl as openssl; extern crate tls_openssl as openssl;
#[cfg(feature = "accept")] #[cfg(feature = "accept")]