1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-16 19:28:59 +02:00

Compare commits

..

24 Commits

Author SHA1 Message Date
Rob Ede
a9251474c1 Merge remote-tracking branch 'origin/fallible-services' into fallible-services 2021-11-01 03:21:03 +00:00
Rob Ede
5097b12b7c remove and_then_send 2021-11-01 03:19:32 +00:00
Rob Ede
3c6f586b89 doc tweaks 2021-11-01 02:19:20 +00:00
Rob Ede
f7985c585a Update actix-server/src/builder.rs
Co-authored-by: Ali MJ Al-Nasrawy <alimjalnasrawy@gmail.com>
2021-11-01 00:36:53 +00:00
Rob Ede
e49fedbfe7 doc 2021-10-28 20:59:44 +01:00
Rob Ede
75a877b631 fmt 2021-10-28 20:57:36 +01:00
Rob Ede
336e98e950 fix doc test 2021-10-28 20:57:01 +01:00
Rob Ede
448626d543 fix tls examples 2021-10-25 18:42:23 +01:00
Rob Ede
9b9869f1dd fix startup fail example 2021-10-25 18:13:08 +01:00
Rob Ede
4c0eaca581 convert Server::bind to accept a normal service factory 2021-10-25 18:03:52 +01:00
Rob Ede
81421c2ba9 rename maxconn => max_concurrent_connections 2021-10-22 21:01:50 +01:00
Rob Ede
305d0e9d8a rename server to serverhandle 2021-10-22 18:34:11 +01:00
Rob Ede
1c8fcaebbc tweak server logging 2021-10-22 18:17:26 +01:00
fakeshadow
a1d15f2e08 minimal support of System type with io-uring (#395) 2021-10-21 11:04:51 +01:00
Rob Ede
70ea5322ab prepare actix-tls 3.0.0-beta.7 release (#401) 2021-10-20 17:12:11 +01:00
Rob Ede
303666278a prepare actix-tls release 3.0.0-beta.6 2021-10-19 16:51:40 +01:00
Edward Shen
669e868370 Use tokio-rustls 0.23 (#396)
Co-authored-by: Rob Ede <robjtede@icloud.com>
2021-10-19 16:48:23 +01:00
Rob Ede
47f278b17a fix test macro in presence of other imports named test (#399) 2021-10-19 16:13:13 +01:00
Rob Ede
ca77d8d835 split -server and -tls msrv and raise to 1.52 (#398) 2021-10-19 14:53:42 +01:00
Rob Ede
00775884f8 prepare actix-macros release 0.2.2 2021-10-14 11:08:02 +01:00
Rob Ede
4ff8a2cf68 make runtime macros more IDE friendly (#391) 2021-10-14 10:54:39 +01:00
Rob Ede
5c555a9408 prepare actix-rt release 2.3.0 2021-10-11 22:55:23 +01:00
Rob Ede
ca435b2575 prepare actix-server release 2.0.0-beta.6 2021-10-11 05:14:34 +01:00
Rob Ede
9fa8d7fc5a avoid dependency on older tokios 2021-10-11 05:12:57 +01:00
55 changed files with 1091 additions and 480 deletions

View File

@@ -1,23 +1,25 @@
[alias]
chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"
# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features=io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"
# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test lower msrv
ci-test-lower-msrv = "hack --workspace --exclude=actix-server --exclude=actix-tls --feature-powerset test --lib --tests --no-fail-fast -- --nocapture"

View File

@@ -18,7 +18,7 @@ jobs:
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
version:
- 1.46.0 # MSRV
- 1.52.0 # MSRV for -server and -tls
- stable
- nightly
@@ -64,8 +64,7 @@ jobs:
# - name: Generate Cargo.lock
# uses: actions-rs/cargo@v1
# with:
# command: generate-lockfile
# with: { command: generate-lockfile }
# - name: Cache Dependencies
# uses: Swatinem/rust-cache@v1.2.0
@@ -113,30 +112,69 @@ jobs:
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
build_and_test_lower_msrv:
name: Linux / 1.46 (lower MSRV)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install 1.46.0 # MSRV for all but -server and -tls
uses: actions-rs/toolchain@v1
with:
toolchain: 1.46.0-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: tests
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
coverage:
name: coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust (nightly)
uses: actions-rs/toolchain@v1
with:
toolchain: stable-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with: { command: generate-lockfile }
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0
- name: Generate coverage file
if: github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
rustdoc:
name: rustdoc

View File

@@ -20,5 +20,5 @@ futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
pin-project-lite = "0.2"
tokio = "1"
tokio = "1.5.1"
tokio-util = { version = "0.6", features = ["codec", "io"] }

View File

@@ -178,7 +178,7 @@ impl<T, U> Framed<T, U> {
U: Decoder,
{
loop {
let mut this = self.as_mut().project();
let this = self.as_mut().project();
// Repeatedly call `decode` or `decode_eof` as long as it is "readable". Readable is
// defined as not having returned `None`. If the upstream has returned EOF, and the
// decoder is no longer readable, it can be assumed that the decoder will never become
@@ -186,7 +186,7 @@ impl<T, U> Framed<T, U> {
if this.flags.contains(Flags::READABLE) {
if this.flags.contains(Flags::EOF) {
match this.codec.decode_eof(&mut this.read_buf) {
match this.codec.decode_eof(this.read_buf) {
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
Ok(None) => return Poll::Ready(None),
Err(e) => return Poll::Ready(Some(Err(e))),
@@ -195,7 +195,7 @@ impl<T, U> Framed<T, U> {
log::trace!("attempting to decode a frame");
match this.codec.decode(&mut this.read_buf) {
match this.codec.decode(this.read_buf) {
Ok(Some(frame)) => {
log::trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));

View File

@@ -3,6 +3,19 @@
## Unreleased - 2021-xx-xx
## 0.2.3 - 2021-10-19
* Fix test macro in presence of other imports named "test". [#399]
[#399]: https://github.com/actix/actix-net/pull/399
## 0.2.2 - 2021-10-14
* Improve error recovery potential when macro input is invalid. [#391]
* Allow custom `System`s on test macro. [#391]
[#391]: https://github.com/actix/actix-net/pull/391
## 0.2.1 - 2021-02-02
* Add optional argument `system` to `main` macro which can be used to specify the path to `actix_rt::System` (useful for re-exports). [#363]

View File

@@ -1,12 +1,13 @@
[package]
name = "actix-macros"
version = "0.2.1"
version = "0.2.3"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Ibraheem Ahmed <ibrah1440@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net"
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -22,4 +23,5 @@ syn = { version = "^1", features = ["full"] }
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }
rustversion = "1"
trybuild = "1"

View File

@@ -28,7 +28,12 @@ use quote::quote;
#[proc_macro_attribute]
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
Ok(input) => input,
// on parse err, make IDEs happy; see fn docs
Err(err) => return input_and_compile_error(item, err),
};
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let attrs = &input.attrs;
@@ -101,8 +106,15 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
/// }
/// ```
#[proc_macro_attribute]
pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
Ok(input) => input,
// on parse err, make IDEs happy; see fn docs
Err(err) => return input_and_compile_error(item, err),
};
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let attrs = &input.attrs;
let vis = &input.vis;
let sig = &mut input.sig;
@@ -127,18 +139,64 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
sig.asyncness = None;
let missing_test_attr = if has_test_attr {
quote!()
quote! {}
} else {
quote!(#[test])
quote! { #[::core::prelude::v1::test] }
};
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
for arg in &args {
match arg {
syn::NestedMeta::Meta(syn::Meta::NameValue(syn::MetaNameValue {
lit: syn::Lit::Str(lit),
path,
..
})) => match path
.get_ident()
.map(|i| i.to_string().to_lowercase())
.as_deref()
{
Some("system") => match lit.parse() {
Ok(path) => system = path,
Err(_) => {
return syn::Error::new_spanned(lit, "Expected path")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
}
}
(quote! {
#missing_test_attr
#(#attrs)*
#vis #sig {
actix_rt::System::new()
.block_on(async { #body })
<#system>::new().block_on(async { #body })
}
})
.into()
}
/// Converts the error to a token stream and appends it to the original input.
///
/// Returning the original input in addition to the error is good for IDEs which can gracefully
/// recover and show more precise errors within the macro body.
///
/// See <https://github.com/rust-analyzer/rust-analyzer/issues/10468> for more info.
fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream {
let compile_err = TokenStream::from(err.to_compile_error());
item.extend(compile_err);
item
}

View File

@@ -1,3 +1,4 @@
#[rustversion::stable(1.46)] // MSRV
#[test]
fn compile_macros() {
let t = trybuild::TestCases::new();
@@ -11,4 +12,7 @@ fn compile_macros() {
t.pass("tests/trybuild/test-01-basic.rs");
t.pass("tests/trybuild/test-02-keep-attrs.rs");
t.compile_fail("tests/trybuild/test-03-only-async.rs");
t.pass("tests/trybuild/test-04-system-path.rs");
t.compile_fail("tests/trybuild/test-05-system-expect-path.rs");
t.compile_fail("tests/trybuild/test-06-unknown-attr.rs");
}

View File

@@ -0,0 +1,10 @@
mod system {
pub use actix_rt::System as MySystem;
}
#[actix_rt::test(system = "system::MySystem")]
async fn my_test() {
futures_util::future::ready(()).await
}
fn main() {}

View File

@@ -0,0 +1,4 @@
#[actix_rt::test(system = "!@#*&")]
async fn my_test() {}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: Expected path
--> $DIR/test-05-system-expect-path.rs:1:27
|
1 | #[actix_rt::test(system = "!@#*&")]
| ^^^^^^^

View File

@@ -0,0 +1,7 @@
#[actix_rt::test(foo = "bar")]
async fn my_test_1() {}
#[actix_rt::test(bar::baz)]
async fn my_test_2() {}
fn main() {}

View File

@@ -0,0 +1,11 @@
error: Unknown attribute specified
--> $DIR/test-06-unknown-attr.rs:1:18
|
1 | #[actix_rt::test(foo = "bar")]
| ^^^^^^^^^^^
error: Unknown attribute specified
--> $DIR/test-06-unknown-attr.rs:4:18
|
4 | #[actix_rt::test(bar::baz)]
| ^^^^^^^^

View File

@@ -1,8 +1,11 @@
# Changes
## Unreleased - 2021-xx-xx
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
## 2.3.0 - 2021-10-11
* The `spawn` method can now resolve with non-unit outputs. [#369]
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
[#369]: https://github.com/actix/actix-net/pull/369
[#374]: https://github.com/actix/actix-net/pull/374

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "2.2.0"
version = "2.3.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
@@ -8,8 +8,7 @@ authors = [
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net"
documentation = "https://docs.rs/actix-rt"
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -24,14 +23,14 @@ macros = ["actix-macros"]
io-uring = ["tokio-uring"]
[dependencies]
actix-macros = { version = "0.2.0", optional = true }
actix-macros = { version = "0.2.3", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies]
tokio = { version = "1.2", features = ["full"] }
tokio = { version = "1.5.1", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }

View File

@@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.2.0)](https://docs.rs/actix-rt/2.2.0)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-rt/2.2.0/status.svg)](https://deps.rs/crate/actix-rt/2.2.0)
[![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@@ -15,7 +15,7 @@
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!

View File

@@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, Arbiter};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -29,6 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}
#[cfg(not(feature = "io-uring"))]
impl System {
/// Create a new system.
///
@@ -37,7 +38,7 @@ impl System {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
@@ -53,7 +54,7 @@ impl System {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
@@ -72,7 +73,32 @@ impl System {
system,
}
}
}
#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
unimplemented!("System::with_tokio_rt is not implemented yet")
}
}
impl System {
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
@@ -149,15 +175,18 @@ impl System {
}
}
#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
}
#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
@@ -187,6 +216,45 @@ impl SystemRunner {
}
}
#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;
#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented yet")
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);
let res = fut.await;
drop(stop_rx);
res
})
}
}
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),

View File

@@ -0,0 +1,17 @@
//! Checks that test macro does not cause problems in the presence of imports named "test" that
//! could be either a module with test items or the "test with runtime" macro itself.
//!
//! Before actix/actix-net#399 was implemented, this macro was running twice. The first run output
//! `#[test]` and it got run again and since it was in scope.
//!
//! Prevented by using the fully-qualified test marker (`#[::core::prelude::v1::test]`).
#![cfg(feature = "macros")]
use actix_rt::time as test;
#[actix_rt::test]
async fn test_naming_conflict() {
use test as time;
time::sleep(std::time::Duration::from_millis(2)).await;
}

View File

@@ -1,12 +1,15 @@
use std::{
future::Future,
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};
use actix_rt::{task::JoinError, Arbiter, System};
use tokio::sync::oneshot;
#[cfg(not(feature = "io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
};
#[test]
fn await_for_timer() {
@@ -103,6 +106,10 @@ fn wait_for_spawns() {
assert!(rt.block_on(handle).is_err());
}
// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
@@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
@@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
@@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
@@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
@@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
@@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
@@ -330,28 +342,27 @@ fn spawn_local() {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();
System::new().block_on(async {
let (tx, rx) = std::sync::mpsc::channel();
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
f.sync_all().await.unwrap();
f.close().await.unwrap();
f.sync_all().await.unwrap();
f.close().await.unwrap();
std::fs::remove_file("test.txt").unwrap();
std::fs::remove_file("test.txt").unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
assert!(rx.recv().unwrap());
drop(system);
assert!(rx.recv().unwrap());
})
}

View File

@@ -1,13 +1,22 @@
# Changes
## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Rename `Server` to `ServerHandle`. [#403]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
* Remove wrapper `service::ServiceFactory` trait. [#403]
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
* Minimum supported Rust version (MSRV) is now 1.52.
[#403]: https://github.com/actix/actix-net/pull/403
## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to `SIGHUP` signal. Previously, the received was not used but did block
subsequent exit signals from working. [#389]
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to
this change. [#349]
* Remove `ServerBuilder::configure` [#349]
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to SIGHUP signal.
It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop
the Server from receiving any future signal, because the `Signals` future stops on the first
signal received [#389]
[#374]: https://github.com/actix/actix-net/pull/374
[#349]: https://github.com/actix/actix-net/pull/349
@@ -15,9 +24,9 @@
## 2.0.0-beta.5 - 2021-04-20
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
This would make all worker shutdown immediately in force shutdown case. [#333]
* Server shutdown notifies all workers to exit regardless if shutdown is graceful. This causes all
workers to shutdown immediately in force shutdown case. [#333]
[#333]: https://github.com/actix/actix-net/pull/333

View File

@@ -1,13 +1,13 @@
[package]
name = "actix-server"
version = "2.0.0-beta.5"
version = "2.0.0-beta.6"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
]
description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net"
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -29,13 +29,13 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"
tokio = { version = "1.2", features = ["sync"] }
tokio = { version = "1.5.1", features = ["sync"] }
[dev-dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.8"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@@ -0,0 +1,33 @@
use std::io;
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_factory, fn_service};
use log::info;
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
.init();
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
Server::build()
.bind(
"startup-fail",
addr,
fn_factory(|| async move {
if 1 > 2 {
Ok(fn_service(move |mut _stream: TcpStream| async move {
Ok::<u32, u32>(0)
}))
} else {
Err(42)
}
}),
)?
.workers(2)
.run()
.await
}

View File

@@ -19,9 +19,8 @@ use std::{
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use actix_service::{fn_factory, fn_service, ServiceExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@@ -39,52 +38,65 @@ async fn main() -> io::Result<()> {
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build()
.bind("echo", addr, move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
fn_service(move |mut stream: TcpStream| {
.bind("echo", addr, {
fn_factory::<_, (), _, _, _, _>(move || {
let count = Arc::clone(&count);
async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;
let count = Arc::clone(&count);
let count2 = Arc::clone(&count);
let mut size = 0;
let mut buf = BytesMut::new();
let svc = fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
let num = count.fetch_add(1, Ordering::SeqCst) + 1;
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
info!(
"[{}] accepting connection from: {}",
num,
stream.peer_addr().unwrap()
);
async move {
let mut size = 0;
let mut buf = BytesMut::new();
loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
// send data down service pipeline
Ok((buf.freeze(), size))
}
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = count2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
async move { Ok(size) }
});
// send data down service pipeline
Ok((buf.freeze(), size))
Ok::<_, ()>(svc.clone())
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})?
.workers(1)
.workers(2)
.run()
.await
}

View File

@@ -5,10 +5,10 @@ use actix_rt::{
time::{sleep, Instant},
System,
};
use log::{error, info};
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server;
use crate::server::ServerHandle;
use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
@@ -30,13 +30,13 @@ struct ServerSocketInfo {
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
srv: Option<ServerHandle>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> Self {
pub fn new(srv: ServerHandle) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
@@ -74,21 +74,16 @@ struct Accept {
poll: Poll,
waker: WakerQueue,
handles: Vec<WorkerHandleAccept>,
srv: Server,
srv: ServerHandle,
next: usize,
avail: Availability,
paused: bool,
}
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
@@ -158,7 +153,7 @@ impl Accept {
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
srv: Server,
srv: ServerHandle,
handles: Vec<WorkerHandleAccept>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
@@ -181,7 +176,7 @@ impl Accept {
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
srv: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
.into_iter()
@@ -234,7 +229,7 @@ impl Accept {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
if exit {
info!("Accept is stopped.");
info!("Accept thread stopped");
return;
}
}
@@ -370,14 +365,14 @@ impl Accept {
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
}

View File

@@ -1,4 +1,5 @@
use std::{
fmt,
future::Future,
io, mem,
pin::Pin,
@@ -7,21 +8,25 @@ use std::{
};
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_service::ServiceFactory;
use log::{error, info};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop;
use crate::join_all;
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
use crate::{
accept::AcceptLoop,
join_all,
server::{ServerCommand, ServerHandle},
service::{ServerServiceFactory, StreamNewService},
signals::{Signal, Signals},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
};
/// Server builder
pub struct ServerBuilder {
@@ -29,13 +34,13 @@ pub struct ServerBuilder {
token: usize,
backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
services: Vec<Box<dyn ServerServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop,
exit: bool,
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
server: Server,
server: ServerHandle,
notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig,
}
@@ -50,7 +55,7 @@ impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel();
let server = Server::new(tx);
let server = ServerHandle::new(tx);
ServerBuilder {
threads: num_cpus::get(),
@@ -114,15 +119,21 @@ impl ServerBuilder {
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
/// All socket listeners will stop accepting connections when this limit is reached for
/// each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn maxconn(mut self, num: usize) -> Self {
/// By default max connections is set to a 25,600 per worker.
pub fn max_concurrent_connections(mut self, num: usize) -> Self {
self.worker_config.max_concurrent_connections(num);
self
}
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
pub fn maxconn(self, num: usize) -> Self {
self.max_concurrent_connections(num)
}
/// Stop Actix system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
@@ -147,91 +158,61 @@ impl ServerBuilder {
self
}
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
/// Bind server to socket addresses.
///
/// Binds to all network interface addresses that resolve from the `addr` argument.
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
/// interfaces at the same time by passing a list of socket addresses.
///
/// This fails only if all addresses fail to bind.
pub fn bind<F, U, InitErr>(
mut self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
U: ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets {
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
}
Ok(self)
}
/// Add new unix domain service to the server.
#[cfg(unix)]
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
#[cfg(unix)]
pub fn listen_uds<F, N: AsRef<str>>(
/// Bind server to existing TCP listener.
///
/// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen<F, InitErr>(
mut self,
name: N,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
name: impl AsRef<str>,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
@@ -246,11 +227,18 @@ impl ServerBuilder {
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
pub fn run(mut self) -> ServerHandle {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
for (_, name, lst) in &self.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
self.threads,
lst.local_addr()
);
}
// start workers
let handles = (0..self.threads)
@@ -264,9 +252,6 @@ impl ServerBuilder {
.collect();
// start accept thread
for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2);
}
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
@@ -280,7 +265,7 @@ impl ServerBuilder {
Signals::start(self.server.clone());
}
// start http server actor
// start http server
let server = self.server.clone();
rt::spawn(self);
server
@@ -312,23 +297,25 @@ impl ServerBuilder {
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
info!("SIGINT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
info!("SIGTERM received; starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
info!("SIGQUIT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
@@ -359,12 +346,14 @@ impl ServerBuilder {
rt::spawn(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
@@ -386,7 +375,7 @@ impl ServerBuilder {
}
if found {
error!("Worker has died {:?}, restarting", idx);
error!("Worker {} has died; restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
@@ -415,6 +404,74 @@ impl ServerBuilder {
}
}
/// Unix Domain Socket (UDS) support.
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, InitErr>(
self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
U: AsRef<std::path::Path>,
InitErr: fmt::Debug + Send + 'static,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
///
/// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen_uds<F, InitErr>(
mut self,
name: impl AsRef<str>,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
InitErr: fmt::Debug + Send + 'static,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
}
impl Future for ServerBuilder {
type Output = ();
@@ -433,29 +490,28 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
backlog: u32,
) -> io::Result<Vec<MioTcpListener>> {
let mut err = None;
let mut succ = false;
let mut success = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) {
Ok(lst) => {
succ = true;
success = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
if success {
Ok(sockets)
} else if let Some(err) = err.take() {
Err(err)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to socket address",
))
}
}

View File

@@ -15,8 +15,7 @@ mod waker_queue;
mod worker;
pub use self::builder::ServerBuilder;
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::server::{Server, ServerHandle};
pub use self::test_server::TestServer;
#[doc(hidden)]

View File

@@ -6,8 +6,18 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use crate::builder::ServerBuilder;
use crate::signals::Signal;
use crate::{signals::Signal, ServerBuilder};
#[derive(Debug)]
#[non_exhaustive]
pub struct Server;
impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
#[derive(Debug)]
pub(crate) enum ServerCommand {
@@ -15,8 +25,8 @@ pub(crate) enum ServerCommand {
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
/// True if shut down should be graceful.
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
@@ -24,20 +34,22 @@ pub(crate) enum ServerCommand {
Notify(oneshot::Sender<()>),
}
/// Server handle.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)]
pub struct Server(
pub struct ServerHandle(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl Server {
impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
ServerHandle(tx, None)
}
pub(crate) fn signal(&self, sig: Signal) {
@@ -84,13 +96,13 @@ impl Server {
}
}
impl Clone for Server {
impl Clone for ServerHandle {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
impl Future for ServerHandle {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@@ -1,26 +1,26 @@
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::task::{Context, Poll};
use std::{
fmt,
marker::PhantomData,
net::SocketAddr,
task::{Context, Poll},
};
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::socket::{FromStream, MioStream};
use crate::worker::WorkerCounterGuard;
use crate::{
socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
};
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory;
}
pub(crate) trait InternalServiceFactory: Send {
pub(crate) trait ServerServiceFactory: Send {
fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn clone_factory(&self) -> Box<dyn ServerServiceFactory>;
/// Initialize Mio stream handler service and return it with its service token.
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
}
@@ -56,7 +56,7 @@ where
{
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(|_| ())
@@ -72,25 +72,26 @@ where
});
Ok(())
}
Err(e) => {
error!("Can not convert to an async tcp stream: {}", e);
Err(err) => {
error!("Can not convert Mio stream to an async TCP stream: {}", err);
Err(())
}
})
}
}
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
pub(crate) struct StreamNewService<F, Io, InitErr> {
name: String,
inner: F,
token: usize,
addr: SocketAddr,
_t: PhantomData<Io>,
_t: PhantomData<(Io, InitErr)>,
}
impl<F, Io> StreamNewService<F, Io>
impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
where
F: ServiceFactory<Io>,
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static,
{
pub(crate) fn create(
@@ -98,7 +99,7 @@ where
token: usize,
inner: F,
addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> {
) -> Box<dyn ServerServiceFactory> {
Box::new(Self {
name,
token,
@@ -109,16 +110,17 @@ where
}
}
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
impl<F, Io, InitErr> ServerServiceFactory for StreamNewService<F, Io, InitErr>
where
F: ServiceFactory<Io>,
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static,
{
fn name(&self, _: usize) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn ServerServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
@@ -130,28 +132,18 @@ where
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token;
let fut = self.inner.create().new_service(());
let fut = self.inner.new_service(());
Box::pin(async move {
match fut.await {
Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _;
Ok(svc) => {
let service = Box::new(StreamService::new(svc)) as _;
Ok((token, service))
}
Err(_) => Err(()),
Err(err) => {
error!("{:?}", err);
Err(())
}
}
})
}
}
impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>,
I: FromStream,
{
type Factory = T;
fn create(&self) -> T {
(self)()
}
}

View File

@@ -2,30 +2,36 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::server::Server;
use crate::server::ServerHandle;
/// Different types of process signals
/// Types of process signals.
#[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)]
pub(crate) enum Signal {
/// SIGINT
/// `SIGINT`
Int,
/// SIGTERM
/// `SIGTERM`
Term,
/// SIGQUIT
/// `SIGQUIT`
Quit,
}
/// Process signal listener.
pub(crate) struct Signals {
srv: Server,
srv: ServerHandle,
#[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)]
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
}
impl Signals {
pub(crate) fn start(srv: Server) {
/// Spawns a signal listening future that is able to send commands to the `Server`.
pub(crate) fn start(srv: ServerHandle) {
#[cfg(not(unix))]
{
actix_rt::spawn(Signals {
@@ -33,6 +39,7 @@ impl Signals {
signals: Box::pin(actix_rt::signal::ctrl_c()),
});
}
#[cfg(unix)]
{
use actix_rt::signal::unix;
@@ -76,6 +83,7 @@ impl Future for Signals {
}
Poll::Pending => Poll::Pending,
}
#[cfg(unix)]
{
for (sig, fut) in self.signals.iter_mut() {

View File

@@ -1,25 +1,24 @@
use std::sync::mpsc;
use std::{net, thread};
use std::{fmt, net, sync::mpsc, thread};
use actix_rt::{net::TcpStream, System};
use actix_service::ServiceFactory;
use crate::{Server, ServerBuilder, ServiceFactory};
use crate::{Server, ServerBuilder};
/// The `TestServer` type.
/// A testing server.
///
/// `TestServer` is very simple test server that simplify process of writing
/// integration tests for actix-net applications.
/// `TestServer` is very simple test server that simplify process of writing integration tests for
/// network applications.
///
/// # Examples
///
/// ```
/// use actix_service::fn_service;
/// use actix_server::TestServer;
/// use actix_service::fn_service;
///
/// #[actix_rt::main]
/// async fn main() {
/// let srv = TestServer::with(|| fn_service(
/// |sock| async move {
/// let srv = TestServer::with(fn_service(|sock|
/// async move {
/// println!("New connection: {:?}", sock);
/// Ok::<_, ()>(())
/// }
@@ -28,9 +27,10 @@ use crate::{Server, ServerBuilder, ServiceFactory};
/// println!("SOCKET: {:?}", srv.connect());
/// }
/// ```
#[non_exhaustive]
pub struct TestServer;
/// Test server runtime
/// Test server runtime.
pub struct TestServerRuntime {
addr: net::SocketAddr,
host: String,
@@ -39,7 +39,7 @@ pub struct TestServerRuntime {
}
impl TestServer {
/// Start new server with server builder
/// Start new server using server builder.
pub fn start<F>(mut factory: F) -> TestServerRuntime
where
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
@@ -64,8 +64,12 @@ impl TestServer {
}
}
/// Start new test server with application factory
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
/// Start new test server with default settings using application factory.
pub fn with<F, InitErr>(factory: F) -> TestServerRuntime
where
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
let (tx, rx) = mpsc::channel();
// run server in separate thread
@@ -99,7 +103,7 @@ impl TestServer {
}
}
/// Get first available unused local address
/// Get first available unused local address.
pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap();
@@ -111,27 +115,27 @@ impl TestServer {
}
impl TestServerRuntime {
/// Test server host
/// Test server host.
pub fn host(&self) -> &str {
&self.host
}
/// Test server port
/// Test server port.
pub fn port(&self) -> u16 {
self.port
}
/// Get test server address
/// Get test server address.
pub fn addr(&self) -> net::SocketAddr {
self.addr
}
/// Stop http server
/// Stop server.
fn stop(&mut self) {
self.system.stop();
}
/// Connect to server, return tokio TcpStream
/// Connect to server, returning a Tokio `TcpStream`.
pub fn connect(&self) -> std::io::Result<TcpStream> {
TcpStream::from_std(net::TcpStream::connect(self.addr)?)
}

View File

@@ -24,10 +24,12 @@ use tokio::sync::{
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::service::{BoxedServerService, ServerServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop {
@@ -181,6 +183,7 @@ impl WorkerHandleAccept {
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
#[derive(Debug)]
pub(crate) struct WorkerHandleServer {
#[allow(dead_code)]
idx: usize,
tx: UnboundedSender<Stop>,
}
@@ -195,7 +198,7 @@ impl WorkerHandleServer {
/// Service worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
/// Worker accepts socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
@@ -203,7 +206,7 @@ pub(crate) struct ServerWorker {
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
factories: Box<[Box<dyn ServerServiceFactory>]>,
state: WorkerState,
shutdown_timeout: Duration,
}
@@ -243,10 +246,11 @@ impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
shutdown_timeout: DEFAULT_SHUTDOWN_DURATION,
max_blocking_threads,
max_concurrent_connections: 25600,
max_concurrent_connections: 25_600,
}
}
}
@@ -268,7 +272,7 @@ impl ServerWorkerConfig {
impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
factories: Vec<Box<dyn ServerServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
@@ -313,6 +317,7 @@ impl ServerWorker {
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
@@ -326,8 +331,9 @@ impl ServerWorker {
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop();
return;
}
@@ -429,13 +435,15 @@ struct Restart {
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
}
// Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
/// State necessary for server shutdown.
struct Shutdown {
// Interval for checking the shutdown progress.
timer: Pin<Box<Sleep>>,
/// Start time of shutdown.
start_from: Instant,
/// Notify of the shutdown outcome (force/grace) to stop caller.
tx: oneshot::Sender<bool>,
}
@@ -463,11 +471,11 @@ impl Future for ServerWorker {
{
let num = this.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
info!("Shutting down idle worker");
let _ = tx.send(true);
return Poll::Ready(());
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
info!("Graceful worker shutdown; finishing {} connections", num);
this.shutdown(false);
this.state = WorkerState::Shutdown(Shutdown {
@@ -476,7 +484,7 @@ impl Future for ServerWorker {
tx,
});
} else {
info!("Force shutdown worker, {} connections", num);
info!("Force shutdown worker, closing {} connections", num);
this.shutdown(true);
let _ = tx.send(false);
@@ -521,23 +529,25 @@ impl Future for ServerWorker {
self.poll(cx)
}
WorkerState::Shutdown(ref mut shutdown) => {
// Wait for 1 second.
// wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx));
if this.counter.total() == 0 {
// Graceful shutdown.
// graceful shutdown
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true);
}
Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown.
// timeout forceful shutdown
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false);
}
Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
// reset timer and wait for 1 second
let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx)

View File

@@ -5,8 +5,6 @@ use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server;
use actix_service::fn_service;
use actix_utils::future::ok;
use futures_util::future::lazy;
fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
@@ -23,25 +21,26 @@ fn test_bind() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.run()
}));
.bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let (_, sys) = rx.recv().unwrap();
let (srv, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
}
#[test]
@@ -50,25 +49,28 @@ fn test_listen() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async {
Server::build()
let lst = net::TcpListener::bind(addr)?;
actix_rt::System::new().block_on(async {
let srv = Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))?
.run();
let _ = tx.send(actix_rt::System::current());
});
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let sys = rx.recv().unwrap();
let (srv, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
}
#[test]
@@ -84,24 +86,25 @@ fn test_start() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
.bind(
"test",
addr,
fn_service(|io: TcpStream| async move {
let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
})
.unwrap()
.run()
}));
}),
)?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
@@ -134,12 +137,11 @@ fn test_start() {
// stop
let _ = srv.stop(false);
thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(Duration::from_millis(100));
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
thread::sleep(Duration::from_secs(1));
assert!(net::TcpStream::connect(addr).is_err());
}
#[actix_rt::test]
@@ -166,10 +168,10 @@ async fn test_max_concurrent_connections() {
// Set a relative higher backlog.
.backlog(12)
// max connection for a worker is 3.
.maxconn(max_conn)
.max_concurrent_connections(max_conn)
.workers(1)
.disable_signals()
.bind("test", addr, move || {
.bind("test", addr, {
let counter = counter.clone();
fn_service(move |_io: TcpStream| {
let counter = counter.clone();
@@ -209,9 +211,8 @@ async fn test_max_concurrent_connections() {
}
srv.stop(false).await;
sys.stop();
let _ = h.join().unwrap();
h.join().unwrap().unwrap();
}
#[actix_rt::test]
@@ -260,22 +261,20 @@ async fn test_service_restart() {
let server = Server::build()
.backlog(1)
.disable_signals()
.bind("addr1", addr1, move || {
.bind("addr1", addr1, {
let num = num.clone();
fn_factory(move || {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
})
})
.unwrap()
.bind("addr2", addr2, move || {
})?
.bind("addr2", addr2, {
let num2 = num2.clone();
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
})
})
.unwrap()
})?
.workers(1)
.run();
@@ -306,9 +305,9 @@ async fn test_service_restart() {
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
sys.stop();
h.join().unwrap().unwrap();
}
#[ignore]
@@ -318,6 +317,7 @@ async fn worker_restart() {
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[derive(Debug, Clone)]
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
@@ -380,12 +380,12 @@ async fn worker_restart() {
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.bind("addr", addr, TestServiceFactory(counter.clone()))?
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
@@ -447,7 +447,7 @@ async fn worker_restart() {
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
sys.stop();
h.join().unwrap().unwrap();
}

View File

@@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403]
[#403]: https://github.com/actix/actix-net/pull/403
## 2.0.1 - 2021-10-11

View File

@@ -0,0 +1,33 @@
use std::{future::Future, sync::mpsc, time::Duration};
async fn oracle<F, Fut>(f: F) -> (u32, u32)
where
F: FnOnce() -> Fut + Clone + Send + 'static,
Fut: Future<Output = u32> + 'static,
{
let f1 = actix_rt::spawn(f.clone()());
let f2 = actix_rt::spawn(f());
(f1.await.unwrap(), f2.await.unwrap())
}
#[actix_rt::main]
async fn main() {
let (tx, rx) = mpsc::channel();
let (r1, r2) = oracle({
let tx = tx.clone();
|| async move {
tx.send(()).unwrap();
4 * 4
}
})
.await;
assert_eq!(r1, r2);
tx.send(()).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
}

View File

@@ -14,7 +14,7 @@ use super::{Service, ServiceFactory};
/// Service for the `and_then` combinator, chaining a computation onto the end of another service
/// which completes successfully.
///
/// This is created by the `Pipeline::and_then` method.
/// Created by the `.and_then()` combinator.
pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
impl<A, B, Req> AndThenService<A, B, Req> {
@@ -116,7 +116,7 @@ where
}
}
/// `.and_then()` service factory combinator
/// Service factory created by the `.and_then()` combinator.
pub struct AndThenServiceFactory<A, B, Req>
where
A: ServiceFactory<Req>,

View File

@@ -63,7 +63,7 @@ where
}
}
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to NewService\
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to ServiceFactory.
struct ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
where
S1: Service<Req>,

View File

@@ -44,7 +44,7 @@ pub trait ServiceExt<Req>: Service<Req> {
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that the second service
/// isn't called until call to the fist service have finished. Result of the call to the first
/// isn't called until call to the fist service has resolved. Result of the call to the first
/// service is used as an input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a wrapped version of it.

View File

@@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData};
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
/// Create `ServiceFactory` for function that can act as a `Service`
// TODO: remove unnecessary Cfg type param
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
f: F,
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
@@ -48,13 +49,14 @@ where
/// Ok(())
/// }
/// ```
// TODO: remove unnecessary Cfg type param
pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
f: F,
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
where
Srv: Service<Req>,
F: Fn() -> Fut,
Fut: Future<Output = Result<Srv, Err>>,
Srv: Service<Req>,
{
FnServiceNoConfig::new(f)
}
@@ -160,7 +162,7 @@ where
Fut: Future<Output = Result<Res, Err>>,
{
f: F,
_t: PhantomData<(Req, Cfg)>,
_t: PhantomData<fn(Cfg, Req)>,
}
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
@@ -237,7 +239,7 @@ where
Srv: Service<Req>,
{
f: F,
_t: PhantomData<(Fut, Cfg, Req, Srv, Err)>,
_t: PhantomData<fn(Cfg, Req)>,
}
impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err>
@@ -293,7 +295,7 @@ where
Fut: Future<Output = Result<Srv, Err>>,
{
f: F,
_t: PhantomData<(Cfg, Req)>,
_t: PhantomData<fn(Cfg, Req)>,
}
impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
@@ -353,10 +355,11 @@ where
mod tests {
use core::task::Poll;
use alloc::rc::Rc;
use futures_util::future::lazy;
use super::*;
use crate::{ok, Service, ServiceFactory};
use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt};
#[actix_rt::test]
async fn test_fn_service() {
@@ -391,4 +394,142 @@ mod tests {
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv", 1));
}
// these three properties of a service factory are usually important
fn is_static<T: 'static>(_t: &T) {}
fn impls_clone<T: Clone>(_t: &T) {}
fn impls_send<T: Send>(_t: &T) {}
#[actix_rt::test]
async fn test_fn_factory_impl_send() {
let svc_fac = fn_factory_with_config(|cfg: usize| {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Cfg type is explicitly !Send
let svc_fac = fn_factory_with_config(|cfg: Rc<usize>| {
let cfg = Rc::clone(&cfg);
ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg))))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv")))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Req type is explicitly !Send
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv")))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
// Service type is explicitly !Send
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| {
ok::<_, ()>("srv")
})))
});
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
}
#[actix_rt::test]
async fn test_service_combinators_impls() {
#[derive(Clone)]
struct Ident;
impl<T: 'static> Service<T> for Ident {
type Response = T;
type Error = ();
type Future = Ready<Result<Self::Response, Self::Error>>;
crate::always_ready!();
fn call(&self, req: T) -> Self::Future {
ok(req)
}
}
let svc = Ident;
is_static(&svc);
impls_clone(&svc);
impls_send(&svc);
let svc = ServiceExt::map(Ident, core::convert::identity);
impls_send(&svc);
svc.call(()).await.unwrap();
let svc = ServiceExt::map_err(Ident, core::convert::identity);
impls_send(&svc);
svc.call(()).await.unwrap();
let svc = ServiceExt::and_then(Ident, Ident);
// impls_send(&svc); // fails to compile :(
svc.call(()).await.unwrap();
// let svc = ServiceExt::and_then_send(Ident, Ident);
// impls_send(&svc);
// svc.call(()).await.unwrap();
}
#[actix_rt::test]
async fn test_factory_combinators_impls() {
#[derive(Clone)]
struct Ident;
impl<T: 'static> ServiceFactory<T> for Ident {
type Response = T;
type Error = ();
type Config = ();
// explicitly !Send result service
type Service = boxed::RcService<T, Self::Response, Self::Error>;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::Error>>;
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
ok(boxed::rc_service(fn_service(ok)))
}
}
let svc_fac = Ident;
is_static(&svc_fac);
impls_clone(&svc_fac);
impls_send(&svc_fac);
let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity);
impls_send(&svc_fac);
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
let svc_fac = ServiceFactoryExt::and_then(Ident, Ident);
// impls_send(&svc_fac); // fails to compile :(
let svc = svc_fac.new_service(()).await.unwrap();
svc.call(()).await.unwrap();
// let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident);
// impls_send(&svc_fac);
// let svc = svc_fac.new_service(()).await.unwrap();
// svc.call(()).await.unwrap();
}
}

View File

@@ -103,7 +103,7 @@ where
}
}
/// `MapNewService` new service combinator
/// `MapServiceFactory` new service combinator.
pub struct MapServiceFactory<A, F, Req, Res> {
a: A,
f: F,

View File

@@ -238,8 +238,7 @@ where
}
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
/// Map this service's output to a different type, returning a new service.
pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<SF, F, Req, R>, Req>
where
Self: Sized,
@@ -251,7 +250,7 @@ where
}
}
/// Map this service's error to a different error, returning a new service.
/// Map this service's error to a different type, returning a new service.
pub fn map_err<F, E>(
self,
f: F,
@@ -266,7 +265,7 @@ where
}
}
/// Map this factory's init error to a different error, returning a new service.
/// Map this factory's init error to a different type, returning a new service.
pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<SF, F, Req, E>, Req>
where
Self: Sized,

View File

@@ -3,14 +3,29 @@
## Unreleased - 2021-xx-xx
## 3.0.0-beta.7 - 2021-10-20
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
* Alias `connect::ssl` to `connect::tls`. [#401]
[#401]: https://github.com/actix/actix-net/pull/401
## 3.0.0-beta.6 - 2021-10-19
* Update `tokio-rustls` to `0.23` which uses `rustls` `0.20`. [#396]
* Removed a re-export of `Session` from `rustls` as it no longer exist. [#396]
* Minimum supported Rust version (MSRV) is now 1.52.
[#396]: https://github.com/actix/actix-net/pull/396
## 3.0.0-beta.5 - 2021-03-29
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
generation failed instead of panic. [#296]
* Remove `connect::ssl::openssl::OpensslConnectServiceFactory`. [#297]
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
[#295]: https://github.com/actix/actix-net/pull/295

View File

@@ -1,12 +1,10 @@
[package]
name = "actix-tls"
version = "3.0.0-beta.5"
version = "3.0.0-beta.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -40,7 +38,7 @@ native-tls = ["tokio-native-tls"]
uri = ["http"]
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0"
actix-utils = "3.0.0"
@@ -56,19 +54,20 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tokio-openssl = { version = "0.6", optional = true }
# rustls
tokio-rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.21", optional = true }
tokio-rustls = { version = "0.23", optional = true }
webpki-roots = { version = "0.22", optional = true }
# native-tls
tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies]
actix-rt = "2.2.0"
actix-server = "2.0.0-beta.5"
actix-server = "2.0.0-beta.6"
bytes = "1"
env_logger = "0.8"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
log = "0.4"
rustls-pemfile = "0.2.1"
trust-dns-resolver = "0.20.0"
[[example]]

View File

@@ -31,29 +31,36 @@ use std::{
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::ServiceFactoryExt as _;
use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory};
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
use log::info;
use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
};
use rustls::{server::ServerConfig, Certificate, PrivateKey};
use rustls_pemfile::{certs, rsa_private_keys};
const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"];
const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"];
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new());
// Load TLS key and cert files
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap());
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap());
let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap());
let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap());
let cert_chain = certs(cert_file).unwrap();
let cert_chain = certs(cert_file)
.unwrap()
.into_iter()
.map(Certificate)
.collect();
let mut keys = rsa_private_keys(key_file).unwrap();
tls_config
.set_single_cert(cert_chain, keys.remove(0))
let tls_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
.unwrap();
let tls_acceptor = RustlsAcceptor::new(tls_config);
@@ -64,18 +71,34 @@ async fn main() -> io::Result<()> {
info!("starting server on port: {}", &addr.0);
Server::build()
.bind("tls-example", addr, move || {
.bind("tls-example", addr, {
let count = Arc::clone(&count);
// Set up TLS service factory
tls_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed);
info!("[{}] Got TLS connection: {:?}", num, &*stream);
ok(())
})
// note: moving rustls acceptor into fn_factory scope
fn_factory(move || {
// manually call new_service so that and_then can be used from ServiceExt
// type annotation for inner stream type is required
let svc = <RustlsAcceptor as ServiceFactory<TcpStream>>::new_service(
&tls_acceptor,
(),
);
let count = Arc::clone(&count);
async move {
let svc = svc
.await?
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(fn_service(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed) + 1;
info!("[{}] Got TLS connection: {:?}", num, &*stream);
ok(())
}));
Ok::<_, ()>(svc)
}
})
})?
.workers(1)
.run()

View File

@@ -14,7 +14,7 @@ use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture;
use tokio_rustls::{Accept, TlsAcceptor};
pub use tokio_rustls::rustls::{ServerConfig, Session};
pub use tokio_rustls::rustls::ServerConfig;
use super::MAX_CONN_COUNTER;

View File

@@ -21,7 +21,9 @@ mod connector;
mod error;
mod resolve;
mod service;
pub mod ssl;
pub mod tls;
#[doc(hidden)]
pub use tls as ssl;
#[cfg(feature = "uri")]
mod uri;

View File

@@ -1,4 +1,4 @@
//! SSL Services
//! TLS Services
#[cfg(feature = "openssl")]
pub mod openssl;

View File

@@ -1,4 +1,5 @@
use std::{
convert::TryFrom,
future::Future,
io,
pin::Pin,
@@ -6,7 +7,6 @@ use std::{
task::{Context, Poll},
};
pub use tokio_rustls::rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
pub use webpki_roots::TLS_SERVER_ROOTS;
@@ -14,11 +14,26 @@ use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use tokio_rustls::webpki::DNSNameRef;
use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{Connect, TlsConnector};
use crate::connect::{Address, Connection};
/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store.
pub fn webpki_roots_cert_store() -> RootCertStore {
let mut root_certs = RootCertStore::empty();
for cert in TLS_SERVER_ROOTS.0 {
let cert = OwnedTrustAnchor::from_subject_spki_name_constraints(
cert.subject,
cert.spki,
cert.name_constraints,
);
let certs = vec![cert].into_iter();
root_certs.add_server_trust_anchors(certs);
}
root_certs
}
/// Rustls connector factory
pub struct RustlsConnector {
connector: Arc<ClientConfig>,
@@ -89,7 +104,7 @@ where
trace!("SSL Handshake start for: {:?}", connection.host());
let (stream, connection) = connection.replace_io(());
match DNSNameRef::try_from_ascii_str(connection.host()) {
match ServerName::try_from(connection.host()) {
Ok(host) => RustlsConnectorServiceFuture::Future {
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
connection: Some(connection),

View File

@@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect};
#[cfg(feature = "openssl")]
#[actix_rt::test]
async fn test_string() {
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -34,7 +34,7 @@ async fn test_string() {
#[cfg(feature = "rustls")]
#[actix_rt::test]
async fn test_rustls_string() {
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -50,13 +50,11 @@ async fn test_rustls_string() {
#[actix_rt::test]
async fn test_static_str() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
let srv = TestServer::with(fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}));
let conn = actix_connect::default_connector();
@@ -75,13 +73,11 @@ async fn test_static_str() {
#[actix_rt::test]
async fn test_new_service() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
let srv = TestServer::with(fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}));
let factory = actix_connect::default_connector_factory();
@@ -98,7 +94,7 @@ async fn test_new_service() {
async fn test_openssl_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -117,7 +113,7 @@ async fn test_openssl_uri() {
async fn test_rustls_uri() {
use std::convert::TryFrom;
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
@@ -133,7 +129,7 @@ async fn test_rustls_uri() {
#[actix_rt::test]
async fn test_local_addr() {
let srv = TestServer::with(|| {
let srv = TestServer::with({
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;

View File

@@ -38,8 +38,9 @@ async fn custom_resolver() {
async fn custom_resolver_connect() {
use trust_dns_resolver::TokioAsyncResolver;
let srv =
TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) }));
let srv = TestServer::with(fn_service(|_io: TcpStream| async {
Ok::<_, io::Error>(())
}));
struct MyResolver {
trust_dns: TokioAsyncResolver,

View File

@@ -24,4 +24,5 @@ serde = { version = "1.0", optional = true }
[dev-dependencies]
serde_json = "1.0"
ahash = { version = "0.7", default-features = false }
# TODO: remove when ahash MSRV is restored
ahash = { version = "=0.7.4", default-features = false }

1
clippy.toml Normal file
View File

@@ -0,0 +1 @@
msrv = "1.48"

View File

@@ -18,4 +18,4 @@ futures-util = { version = "0.3.7", default-features = false }
local-waker = "0.1"
[dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] }
tokio = { version = "1.5.1", features = ["rt", "macros"] }