mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-16 19:28:59 +02:00
Compare commits
24 Commits
service-v2
...
fallible-s
Author | SHA1 | Date | |
---|---|---|---|
|
a9251474c1 | ||
|
5097b12b7c | ||
|
3c6f586b89 | ||
|
f7985c585a | ||
|
e49fedbfe7 | ||
|
75a877b631 | ||
|
336e98e950 | ||
|
448626d543 | ||
|
9b9869f1dd | ||
|
4c0eaca581 | ||
|
81421c2ba9 | ||
|
305d0e9d8a | ||
|
1c8fcaebbc | ||
|
a1d15f2e08 | ||
|
70ea5322ab | ||
|
303666278a | ||
|
669e868370 | ||
|
47f278b17a | ||
|
ca77d8d835 | ||
|
00775884f8 | ||
|
4ff8a2cf68 | ||
|
5c555a9408 | ||
|
ca435b2575 | ||
|
9fa8d7fc5a |
@@ -1,23 +1,25 @@
|
||||
[alias]
|
||||
chk = "check --workspace --all-features --tests --examples --bins"
|
||||
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
|
||||
|
||||
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"
|
||||
|
||||
# just check the library (without dev deps)
|
||||
ci-check-min = "hack --workspace check --no-default-features"
|
||||
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
|
||||
ci-check-lib = "hack --workspace --feature-powerset --exclude-features=io-uring check"
|
||||
ci-check-lib-linux = "hack --workspace --feature-powerset check"
|
||||
|
||||
# check everything
|
||||
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
|
||||
ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring check --tests --examples"
|
||||
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
|
||||
|
||||
# tests avoiding io-uring feature
|
||||
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
|
||||
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
|
||||
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
|
||||
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
|
||||
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
|
||||
|
||||
# test with io-uring feature
|
||||
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
|
||||
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
|
||||
|
||||
# test lower msrv
|
||||
ci-test-lower-msrv = "hack --workspace --exclude=actix-server --exclude=actix-tls --feature-powerset test --lib --tests --no-fail-fast -- --nocapture"
|
||||
|
82
.github/workflows/ci.yml
vendored
82
.github/workflows/ci.yml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
|
||||
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
|
||||
version:
|
||||
- 1.46.0 # MSRV
|
||||
- 1.52.0 # MSRV for -server and -tls
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
@@ -64,8 +64,7 @@ jobs:
|
||||
|
||||
# - name: Generate Cargo.lock
|
||||
# uses: actions-rs/cargo@v1
|
||||
# with:
|
||||
# command: generate-lockfile
|
||||
# with: { command: generate-lockfile }
|
||||
# - name: Cache Dependencies
|
||||
# uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
@@ -113,30 +112,69 @@ jobs:
|
||||
- name: tests
|
||||
if: matrix.target.os == 'ubuntu-latest'
|
||||
run: |
|
||||
cargo ci-test
|
||||
cargo ci-test-rt-linux
|
||||
cargo ci-test-server-linux
|
||||
|
||||
- name: Generate coverage file
|
||||
if: >
|
||||
matrix.target.os == 'ubuntu-latest'
|
||||
&& matrix.version == 'stable'
|
||||
&& github.ref == 'refs/heads/master'
|
||||
run: |
|
||||
cargo install cargo-tarpaulin
|
||||
cargo tarpaulin --out Xml --verbose
|
||||
- name: Upload to Codecov
|
||||
if: >
|
||||
matrix.target.os == 'ubuntu-latest'
|
||||
&& matrix.version == 'stable'
|
||||
&& github.ref == 'refs/heads/master'
|
||||
uses: codecov/codecov-action@v1
|
||||
with: { file: cobertura.xml }
|
||||
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
|
||||
|
||||
- name: Clear the cargo caches
|
||||
run: |
|
||||
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
|
||||
cargo-cache
|
||||
|
||||
build_and_test_lower_msrv:
|
||||
name: Linux / 1.46 (lower MSRV)
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install 1.46.0 # MSRV for all but -server and -tls
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: 1.46.0-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Install cargo-hack
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: install
|
||||
args: cargo-hack
|
||||
|
||||
- name: tests
|
||||
run: |
|
||||
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
|
||||
|
||||
- name: Clear the cargo caches
|
||||
run: |
|
||||
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
|
||||
cargo-cache
|
||||
|
||||
coverage:
|
||||
name: coverage
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install Rust (nightly)
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with: { command: generate-lockfile }
|
||||
- name: Cache Dependencies
|
||||
uses: Swatinem/rust-cache@v1.3.0
|
||||
|
||||
- name: Generate coverage file
|
||||
if: github.ref == 'refs/heads/master'
|
||||
run: |
|
||||
cargo install cargo-tarpaulin
|
||||
cargo tarpaulin --out Xml --verbose
|
||||
- name: Upload to Codecov
|
||||
if: github.ref == 'refs/heads/master'
|
||||
uses: codecov/codecov-action@v1
|
||||
with: { file: cobertura.xml }
|
||||
|
||||
rustdoc:
|
||||
name: rustdoc
|
||||
|
@@ -20,5 +20,5 @@ futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
log = "0.4"
|
||||
pin-project-lite = "0.2"
|
||||
tokio = "1"
|
||||
tokio = "1.5.1"
|
||||
tokio-util = { version = "0.6", features = ["codec", "io"] }
|
||||
|
@@ -178,7 +178,7 @@ impl<T, U> Framed<T, U> {
|
||||
U: Decoder,
|
||||
{
|
||||
loop {
|
||||
let mut this = self.as_mut().project();
|
||||
let this = self.as_mut().project();
|
||||
// Repeatedly call `decode` or `decode_eof` as long as it is "readable". Readable is
|
||||
// defined as not having returned `None`. If the upstream has returned EOF, and the
|
||||
// decoder is no longer readable, it can be assumed that the decoder will never become
|
||||
@@ -186,7 +186,7 @@ impl<T, U> Framed<T, U> {
|
||||
|
||||
if this.flags.contains(Flags::READABLE) {
|
||||
if this.flags.contains(Flags::EOF) {
|
||||
match this.codec.decode_eof(&mut this.read_buf) {
|
||||
match this.codec.decode_eof(this.read_buf) {
|
||||
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
|
||||
Ok(None) => return Poll::Ready(None),
|
||||
Err(e) => return Poll::Ready(Some(Err(e))),
|
||||
@@ -195,7 +195,7 @@ impl<T, U> Framed<T, U> {
|
||||
|
||||
log::trace!("attempting to decode a frame");
|
||||
|
||||
match this.codec.decode(&mut this.read_buf) {
|
||||
match this.codec.decode(this.read_buf) {
|
||||
Ok(Some(frame)) => {
|
||||
log::trace!("frame decoded from buffer");
|
||||
return Poll::Ready(Some(Ok(frame)));
|
||||
|
@@ -3,6 +3,19 @@
|
||||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 0.2.3 - 2021-10-19
|
||||
* Fix test macro in presence of other imports named "test". [#399]
|
||||
|
||||
[#399]: https://github.com/actix/actix-net/pull/399
|
||||
|
||||
|
||||
## 0.2.2 - 2021-10-14
|
||||
* Improve error recovery potential when macro input is invalid. [#391]
|
||||
* Allow custom `System`s on test macro. [#391]
|
||||
|
||||
[#391]: https://github.com/actix/actix-net/pull/391
|
||||
|
||||
|
||||
## 0.2.1 - 2021-02-02
|
||||
* Add optional argument `system` to `main` macro which can be used to specify the path to `actix_rt::System` (useful for re-exports). [#363]
|
||||
|
||||
|
@@ -1,12 +1,13 @@
|
||||
[package]
|
||||
name = "actix-macros"
|
||||
version = "0.2.1"
|
||||
version = "0.2.3"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Ibraheem Ahmed <ibrah1440@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
]
|
||||
description = "Macros for Actix system and runtime"
|
||||
repository = "https://github.com/actix/actix-net"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -22,4 +23,5 @@ syn = { version = "^1", features = ["full"] }
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
rustversion = "1"
|
||||
trybuild = "1"
|
||||
|
@@ -28,7 +28,12 @@ use quote::quote;
|
||||
#[proc_macro_attribute]
|
||||
#[cfg(not(test))] // Work around for rust-lang/rust#62127
|
||||
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
|
||||
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
|
||||
let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
|
||||
Ok(input) => input,
|
||||
// on parse err, make IDEs happy; see fn docs
|
||||
Err(err) => return input_and_compile_error(item, err),
|
||||
};
|
||||
|
||||
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
|
||||
|
||||
let attrs = &input.attrs;
|
||||
@@ -101,8 +106,15 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
|
||||
/// }
|
||||
/// ```
|
||||
#[proc_macro_attribute]
|
||||
pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||
let mut input = syn::parse_macro_input!(item as syn::ItemFn);
|
||||
pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
|
||||
let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
|
||||
Ok(input) => input,
|
||||
// on parse err, make IDEs happy; see fn docs
|
||||
Err(err) => return input_and_compile_error(item, err),
|
||||
};
|
||||
|
||||
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
|
||||
|
||||
let attrs = &input.attrs;
|
||||
let vis = &input.vis;
|
||||
let sig = &mut input.sig;
|
||||
@@ -127,18 +139,64 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||
sig.asyncness = None;
|
||||
|
||||
let missing_test_attr = if has_test_attr {
|
||||
quote!()
|
||||
quote! {}
|
||||
} else {
|
||||
quote!(#[test])
|
||||
quote! { #[::core::prelude::v1::test] }
|
||||
};
|
||||
|
||||
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
|
||||
|
||||
for arg in &args {
|
||||
match arg {
|
||||
syn::NestedMeta::Meta(syn::Meta::NameValue(syn::MetaNameValue {
|
||||
lit: syn::Lit::Str(lit),
|
||||
path,
|
||||
..
|
||||
})) => match path
|
||||
.get_ident()
|
||||
.map(|i| i.to_string().to_lowercase())
|
||||
.as_deref()
|
||||
{
|
||||
Some("system") => match lit.parse() {
|
||||
Ok(path) => system = path,
|
||||
Err(_) => {
|
||||
return syn::Error::new_spanned(lit, "Expected path")
|
||||
.to_compile_error()
|
||||
.into();
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
return syn::Error::new_spanned(arg, "Unknown attribute specified")
|
||||
.to_compile_error()
|
||||
.into();
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
return syn::Error::new_spanned(arg, "Unknown attribute specified")
|
||||
.to_compile_error()
|
||||
.into();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(quote! {
|
||||
#missing_test_attr
|
||||
#(#attrs)*
|
||||
#vis #sig {
|
||||
actix_rt::System::new()
|
||||
.block_on(async { #body })
|
||||
<#system>::new().block_on(async { #body })
|
||||
}
|
||||
})
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Converts the error to a token stream and appends it to the original input.
|
||||
///
|
||||
/// Returning the original input in addition to the error is good for IDEs which can gracefully
|
||||
/// recover and show more precise errors within the macro body.
|
||||
///
|
||||
/// See <https://github.com/rust-analyzer/rust-analyzer/issues/10468> for more info.
|
||||
fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream {
|
||||
let compile_err = TokenStream::from(err.to_compile_error());
|
||||
item.extend(compile_err);
|
||||
item
|
||||
}
|
||||
|
@@ -1,3 +1,4 @@
|
||||
#[rustversion::stable(1.46)] // MSRV
|
||||
#[test]
|
||||
fn compile_macros() {
|
||||
let t = trybuild::TestCases::new();
|
||||
@@ -11,4 +12,7 @@ fn compile_macros() {
|
||||
t.pass("tests/trybuild/test-01-basic.rs");
|
||||
t.pass("tests/trybuild/test-02-keep-attrs.rs");
|
||||
t.compile_fail("tests/trybuild/test-03-only-async.rs");
|
||||
t.pass("tests/trybuild/test-04-system-path.rs");
|
||||
t.compile_fail("tests/trybuild/test-05-system-expect-path.rs");
|
||||
t.compile_fail("tests/trybuild/test-06-unknown-attr.rs");
|
||||
}
|
||||
|
10
actix-macros/tests/trybuild/test-04-system-path.rs
Normal file
10
actix-macros/tests/trybuild/test-04-system-path.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
mod system {
|
||||
pub use actix_rt::System as MySystem;
|
||||
}
|
||||
|
||||
#[actix_rt::test(system = "system::MySystem")]
|
||||
async fn my_test() {
|
||||
futures_util::future::ready(()).await
|
||||
}
|
||||
|
||||
fn main() {}
|
@@ -0,0 +1,4 @@
|
||||
#[actix_rt::test(system = "!@#*&")]
|
||||
async fn my_test() {}
|
||||
|
||||
fn main() {}
|
@@ -0,0 +1,5 @@
|
||||
error: Expected path
|
||||
--> $DIR/test-05-system-expect-path.rs:1:27
|
||||
|
|
||||
1 | #[actix_rt::test(system = "!@#*&")]
|
||||
| ^^^^^^^
|
7
actix-macros/tests/trybuild/test-06-unknown-attr.rs
Normal file
7
actix-macros/tests/trybuild/test-06-unknown-attr.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
#[actix_rt::test(foo = "bar")]
|
||||
async fn my_test_1() {}
|
||||
|
||||
#[actix_rt::test(bar::baz)]
|
||||
async fn my_test_2() {}
|
||||
|
||||
fn main() {}
|
11
actix-macros/tests/trybuild/test-06-unknown-attr.stderr
Normal file
11
actix-macros/tests/trybuild/test-06-unknown-attr.stderr
Normal file
@@ -0,0 +1,11 @@
|
||||
error: Unknown attribute specified
|
||||
--> $DIR/test-06-unknown-attr.rs:1:18
|
||||
|
|
||||
1 | #[actix_rt::test(foo = "bar")]
|
||||
| ^^^^^^^^^^^
|
||||
|
||||
error: Unknown attribute specified
|
||||
--> $DIR/test-06-unknown-attr.rs:4:18
|
||||
|
|
||||
4 | #[actix_rt::test(bar::baz)]
|
||||
| ^^^^^^^^
|
@@ -1,8 +1,11 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
|
||||
|
||||
|
||||
## 2.3.0 - 2021-10-11
|
||||
* The `spawn` method can now resolve with non-unit outputs. [#369]
|
||||
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
|
||||
|
||||
[#369]: https://github.com/actix/actix-net/pull/369
|
||||
[#374]: https://github.com/actix/actix-net/pull/374
|
||||
|
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "actix-rt"
|
||||
version = "2.2.0"
|
||||
version = "2.3.0"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
@@ -8,8 +8,7 @@ authors = [
|
||||
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
||||
keywords = ["async", "futures", "io", "runtime"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net"
|
||||
documentation = "https://docs.rs/actix-rt"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -24,14 +23,14 @@ macros = ["actix-macros"]
|
||||
io-uring = ["tokio-uring"]
|
||||
|
||||
[dependencies]
|
||||
actix-macros = { version = "0.2.0", optional = true }
|
||||
actix-macros = { version = "0.2.3", optional = true }
|
||||
|
||||
futures-core = { version = "0.3", default-features = false }
|
||||
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||
tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||
|
||||
[target.'cfg(target_os = "linux")'.dependencies]
|
||||
tokio-uring = { version = "0.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.2", features = ["full"] }
|
||||
tokio = { version = "1.5.1", features = ["full"] }
|
||||
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
|
||||
|
@@ -3,11 +3,11 @@
|
||||
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
||||
|
||||
[](https://crates.io/crates/actix-rt)
|
||||
[](https://docs.rs/actix-rt/2.2.0)
|
||||
[](https://docs.rs/actix-rt/2.3.0)
|
||||
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
||||

|
||||
<br />
|
||||
[](https://deps.rs/crate/actix-rt/2.2.0)
|
||||
[](https://deps.rs/crate/actix-rt/2.3.0)
|
||||

|
||||
[](https://discord.gg/WghFtEH6Hb)
|
||||
|
||||
|
@@ -15,7 +15,7 @@
|
||||
//! blocking task thread-pool using [`task::spawn_blocking`].
|
||||
//!
|
||||
//! # Examples
|
||||
//! ```
|
||||
//! ```no_run
|
||||
//! use std::sync::mpsc;
|
||||
//! use actix_rt::{Arbiter, System};
|
||||
//!
|
||||
|
@@ -11,7 +11,7 @@ use std::{
|
||||
use futures_core::ready;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
|
||||
use crate::{arbiter::ArbiterHandle, Arbiter};
|
||||
|
||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
@@ -29,6 +29,7 @@ pub struct System {
|
||||
arbiter_handle: ArbiterHandle,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
impl System {
|
||||
/// Create a new system.
|
||||
///
|
||||
@@ -37,7 +38,7 @@ impl System {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new() -> SystemRunner {
|
||||
Self::with_tokio_rt(|| {
|
||||
default_tokio_runtime()
|
||||
crate::runtime::default_tokio_runtime()
|
||||
.expect("Default Actix (Tokio) runtime could not be created.")
|
||||
})
|
||||
}
|
||||
@@ -53,7 +54,7 @@ impl System {
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let rt = Runtime::from(runtime_factory());
|
||||
let rt = crate::runtime::Runtime::from(runtime_factory());
|
||||
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
|
||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||
|
||||
@@ -72,7 +73,32 @@ impl System {
|
||||
system,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "io-uring")]
|
||||
impl System {
|
||||
/// Create a new system.
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if underlying Tokio runtime can not be created.
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new() -> SystemRunner {
|
||||
SystemRunner
|
||||
}
|
||||
|
||||
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||
///
|
||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||
#[doc(hidden)]
|
||||
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
|
||||
where
|
||||
F: Fn() -> tokio::runtime::Runtime,
|
||||
{
|
||||
unimplemented!("System::with_tokio_rt is not implemented yet")
|
||||
}
|
||||
}
|
||||
|
||||
impl System {
|
||||
/// Constructs new system and registers it on the current thread.
|
||||
pub(crate) fn construct(
|
||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||
@@ -149,15 +175,18 @@ impl System {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||
#[derive(Debug)]
|
||||
pub struct SystemRunner {
|
||||
rt: Runtime,
|
||||
rt: crate::runtime::Runtime,
|
||||
stop_rx: oneshot::Receiver<i32>,
|
||||
#[allow(dead_code)]
|
||||
system: System,
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
impl SystemRunner {
|
||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||
pub fn run(self) -> io::Result<()> {
|
||||
@@ -187,6 +216,45 @@ impl SystemRunner {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "io-uring")]
|
||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||
#[derive(Debug)]
|
||||
pub struct SystemRunner;
|
||||
|
||||
#[cfg(feature = "io-uring")]
|
||||
impl SystemRunner {
|
||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||
pub fn run(self) -> io::Result<()> {
|
||||
unimplemented!("SystemRunner::run is not implemented yet")
|
||||
}
|
||||
|
||||
/// Runs the provided future, blocking the current thread until the future completes.
|
||||
#[inline]
|
||||
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
||||
tokio_uring::start(async move {
|
||||
let (stop_tx, stop_rx) = oneshot::channel();
|
||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let sys_arbiter = Arbiter::in_new_system();
|
||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||
|
||||
system
|
||||
.tx()
|
||||
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
||||
.unwrap();
|
||||
|
||||
// init background system arbiter
|
||||
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||
tokio_uring::spawn(sys_ctrl);
|
||||
|
||||
let res = fut.await;
|
||||
drop(stop_rx);
|
||||
res
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum SystemCommand {
|
||||
Exit(i32),
|
||||
|
17
actix-rt/tests/test-macro-import-conflict.rs
Normal file
17
actix-rt/tests/test-macro-import-conflict.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
//! Checks that test macro does not cause problems in the presence of imports named "test" that
|
||||
//! could be either a module with test items or the "test with runtime" macro itself.
|
||||
//!
|
||||
//! Before actix/actix-net#399 was implemented, this macro was running twice. The first run output
|
||||
//! `#[test]` and it got run again and since it was in scope.
|
||||
//!
|
||||
//! Prevented by using the fully-qualified test marker (`#[::core::prelude::v1::test]`).
|
||||
|
||||
#![cfg(feature = "macros")]
|
||||
|
||||
use actix_rt::time as test;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_naming_conflict() {
|
||||
use test as time;
|
||||
time::sleep(std::time::Duration::from_millis(2)).await;
|
||||
}
|
@@ -1,12 +1,15 @@
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::mpsc::channel,
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use actix_rt::{task::JoinError, Arbiter, System};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
use {
|
||||
std::{sync::mpsc::channel, thread},
|
||||
tokio::sync::oneshot,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn await_for_timer() {
|
||||
@@ -103,6 +106,10 @@ fn wait_for_spawns() {
|
||||
assert!(rt.block_on(handle).is_err());
|
||||
}
|
||||
|
||||
// Temporary disabled tests for io-uring feature.
|
||||
// They should be enabled when possible.
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn arbiter_spawn_fn_runs() {
|
||||
let _ = System::new();
|
||||
@@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn arbiter_handle_spawn_fn_runs() {
|
||||
let sys = System::new();
|
||||
@@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
|
||||
sys.run().unwrap();
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn arbiter_drop_no_panic_fn() {
|
||||
let _ = System::new();
|
||||
@@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn arbiter_drop_no_panic_fut() {
|
||||
let _ = System::new();
|
||||
@@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
|
||||
arbiter.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_current_panic() {
|
||||
System::current();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_arbiter_new_panic() {
|
||||
Arbiter::new();
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn system_arbiter_spawn() {
|
||||
let runner = System::new();
|
||||
@@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
|
||||
thread.join().unwrap();
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "io-uring"))]
|
||||
#[test]
|
||||
fn system_stop_stops_arbiters() {
|
||||
let sys = System::new();
|
||||
@@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
|
||||
assert!(!counter.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_current_panic() {
|
||||
System::current();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn no_system_arbiter_new_panic() {
|
||||
Arbiter::new();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_current_no_system() {
|
||||
assert!(System::try_current().is_none())
|
||||
@@ -330,28 +342,27 @@ fn spawn_local() {
|
||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||
#[test]
|
||||
fn tokio_uring_arbiter() {
|
||||
let system = System::new();
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
System::new().block_on(async {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
|
||||
Arbiter::new().spawn(async move {
|
||||
let handle = actix_rt::spawn(async move {
|
||||
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
|
||||
let buf = b"Hello World!";
|
||||
Arbiter::new().spawn(async move {
|
||||
let handle = actix_rt::spawn(async move {
|
||||
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
|
||||
let buf = b"Hello World!";
|
||||
|
||||
let (res, _) = f.write_at(&buf[..], 0).await;
|
||||
assert!(res.is_ok());
|
||||
let (res, _) = f.write_at(&buf[..], 0).await;
|
||||
assert!(res.is_ok());
|
||||
|
||||
f.sync_all().await.unwrap();
|
||||
f.close().await.unwrap();
|
||||
f.sync_all().await.unwrap();
|
||||
f.close().await.unwrap();
|
||||
|
||||
std::fs::remove_file("test.txt").unwrap();
|
||||
std::fs::remove_file("test.txt").unwrap();
|
||||
});
|
||||
|
||||
handle.await.unwrap();
|
||||
tx.send(true).unwrap();
|
||||
});
|
||||
|
||||
handle.await.unwrap();
|
||||
tx.send(true).unwrap();
|
||||
});
|
||||
|
||||
assert!(rx.recv().unwrap());
|
||||
|
||||
drop(system);
|
||||
assert!(rx.recv().unwrap());
|
||||
})
|
||||
}
|
||||
|
@@ -1,13 +1,22 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
|
||||
* Rename `Server` to `ServerHandle`. [#403]
|
||||
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
|
||||
* Remove wrapper `service::ServiceFactory` trait. [#403]
|
||||
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
|
||||
* Minimum supported Rust version (MSRV) is now 1.52.
|
||||
|
||||
[#403]: https://github.com/actix/actix-net/pull/403
|
||||
|
||||
|
||||
## 2.0.0-beta.6 - 2021-10-11
|
||||
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
|
||||
* Server no long listens to `SIGHUP` signal. Previously, the received was not used but did block
|
||||
subsequent exit signals from working. [#389]
|
||||
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to
|
||||
this change. [#349]
|
||||
* Remove `ServerBuilder::configure` [#349]
|
||||
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
|
||||
* Server no long listens to SIGHUP signal.
|
||||
It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop
|
||||
the Server from receiving any future signal, because the `Signals` future stops on the first
|
||||
signal received [#389]
|
||||
|
||||
[#374]: https://github.com/actix/actix-net/pull/374
|
||||
[#349]: https://github.com/actix/actix-net/pull/349
|
||||
@@ -15,9 +24,9 @@
|
||||
|
||||
|
||||
## 2.0.0-beta.5 - 2021-04-20
|
||||
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
|
||||
This would make all worker shutdown immediately in force shutdown case. [#333]
|
||||
|
||||
* Server shutdown notifies all workers to exit regardless if shutdown is graceful. This causes all
|
||||
workers to shutdown immediately in force shutdown case. [#333]
|
||||
|
||||
[#333]: https://github.com/actix/actix-net/pull/333
|
||||
|
||||
|
||||
|
@@ -1,13 +1,13 @@
|
||||
[package]
|
||||
name = "actix-server"
|
||||
version = "2.0.0-beta.5"
|
||||
version = "2.0.0-beta.6"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"fakeshadow <24548779@qq.com>",
|
||||
]
|
||||
description = "General purpose TCP server built for the Actix ecosystem"
|
||||
keywords = ["network", "framework", "async", "futures"]
|
||||
repository = "https://github.com/actix/actix-net"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -29,13 +29,13 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
|
||||
log = "0.4"
|
||||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
||||
num_cpus = "1.13"
|
||||
tokio = { version = "1.2", features = ["sync"] }
|
||||
tokio = { version = "1.5.1", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-codec = "0.4.0"
|
||||
actix-rt = "2.0.0"
|
||||
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
env_logger = "0.9"
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||
tokio = { version = "1", features = ["io-util"] }
|
||||
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
|
||||
|
33
actix-server/examples/startup-fail.rs
Normal file
33
actix-server/examples/startup-fail.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::io;
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::{fn_factory, fn_service};
|
||||
use log::info;
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
|
||||
.init();
|
||||
|
||||
let addr = ("127.0.0.1", 8080);
|
||||
info!("starting server on port: {}", &addr.0);
|
||||
|
||||
Server::build()
|
||||
.bind(
|
||||
"startup-fail",
|
||||
addr,
|
||||
fn_factory(|| async move {
|
||||
if 1 > 2 {
|
||||
Ok(fn_service(move |mut _stream: TcpStream| async move {
|
||||
Ok::<u32, u32>(0)
|
||||
}))
|
||||
} else {
|
||||
Err(42)
|
||||
}
|
||||
}),
|
||||
)?
|
||||
.workers(2)
|
||||
.run()
|
||||
.await
|
||||
}
|
@@ -19,9 +19,8 @@ use std::{
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::{fn_service, ServiceFactoryExt as _};
|
||||
use actix_service::{fn_factory, fn_service, ServiceExt as _};
|
||||
use bytes::BytesMut;
|
||||
use futures_util::future::ok;
|
||||
use log::{error, info};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
@@ -39,52 +38,65 @@ async fn main() -> io::Result<()> {
|
||||
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
|
||||
// to return a service *factory*; so it can be created once per worker.
|
||||
Server::build()
|
||||
.bind("echo", addr, move || {
|
||||
let count = Arc::clone(&count);
|
||||
let num2 = Arc::clone(&count);
|
||||
|
||||
fn_service(move |mut stream: TcpStream| {
|
||||
.bind("echo", addr, {
|
||||
fn_factory::<_, (), _, _, _, _>(move || {
|
||||
let count = Arc::clone(&count);
|
||||
|
||||
async move {
|
||||
let num = count.fetch_add(1, Ordering::SeqCst);
|
||||
let num = num + 1;
|
||||
let count = Arc::clone(&count);
|
||||
let count2 = Arc::clone(&count);
|
||||
|
||||
let mut size = 0;
|
||||
let mut buf = BytesMut::new();
|
||||
let svc = fn_service(move |mut stream: TcpStream| {
|
||||
let count = Arc::clone(&count);
|
||||
|
||||
loop {
|
||||
match stream.read_buf(&mut buf).await {
|
||||
// end of stream; bail from loop
|
||||
Ok(0) => break,
|
||||
let num = count.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
|
||||
// more bytes to process
|
||||
Ok(bytes_read) => {
|
||||
info!("[{}] read {} bytes", num, bytes_read);
|
||||
stream.write_all(&buf[size..]).await.unwrap();
|
||||
size += bytes_read;
|
||||
info!(
|
||||
"[{}] accepting connection from: {}",
|
||||
num,
|
||||
stream.peer_addr().unwrap()
|
||||
);
|
||||
|
||||
async move {
|
||||
let mut size = 0;
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
loop {
|
||||
match stream.read_buf(&mut buf).await {
|
||||
// end of stream; bail from loop
|
||||
Ok(0) => break,
|
||||
|
||||
// more bytes to process
|
||||
Ok(bytes_read) => {
|
||||
info!("[{}] read {} bytes", num, bytes_read);
|
||||
stream.write_all(&buf[size..]).await.unwrap();
|
||||
size += bytes_read;
|
||||
}
|
||||
|
||||
// stream error; bail from loop with error
|
||||
Err(err) => {
|
||||
error!("Stream Error: {:?}", err);
|
||||
return Err(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stream error; bail from loop with error
|
||||
Err(err) => {
|
||||
error!("Stream Error: {:?}", err);
|
||||
return Err(());
|
||||
}
|
||||
// send data down service pipeline
|
||||
Ok((buf.freeze(), size))
|
||||
}
|
||||
}
|
||||
})
|
||||
.map_err(|err| error!("Service Error: {:?}", err))
|
||||
.and_then(move |(_, size)| {
|
||||
let num = count2.load(Ordering::SeqCst);
|
||||
info!("[{}] total bytes read: {}", num, size);
|
||||
async move { Ok(size) }
|
||||
});
|
||||
|
||||
// send data down service pipeline
|
||||
Ok((buf.freeze(), size))
|
||||
Ok::<_, ()>(svc.clone())
|
||||
}
|
||||
})
|
||||
.map_err(|err| error!("Service Error: {:?}", err))
|
||||
.and_then(move |(_, size)| {
|
||||
let num = num2.load(Ordering::SeqCst);
|
||||
info!("[{}] total bytes read: {}", num, size);
|
||||
ok(size)
|
||||
})
|
||||
})?
|
||||
.workers(1)
|
||||
.workers(2)
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
@@ -5,10 +5,10 @@ use actix_rt::{
|
||||
time::{sleep, Instant},
|
||||
System,
|
||||
};
|
||||
use log::{error, info};
|
||||
use log::{debug, error, info};
|
||||
use mio::{Interest, Poll, Token as MioToken};
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::server::ServerHandle;
|
||||
use crate::socket::MioListener;
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
||||
use crate::worker::{Conn, WorkerHandleAccept};
|
||||
@@ -30,13 +30,13 @@ struct ServerSocketInfo {
|
||||
///
|
||||
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
|
||||
pub(crate) struct AcceptLoop {
|
||||
srv: Option<Server>,
|
||||
srv: Option<ServerHandle>,
|
||||
poll: Option<Poll>,
|
||||
waker: WakerQueue,
|
||||
}
|
||||
|
||||
impl AcceptLoop {
|
||||
pub fn new(srv: Server) -> Self {
|
||||
pub fn new(srv: ServerHandle) -> Self {
|
||||
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
|
||||
let waker = WakerQueue::new(poll.registry())
|
||||
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
|
||||
@@ -74,21 +74,16 @@ struct Accept {
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
srv: Server,
|
||||
srv: ServerHandle,
|
||||
next: usize,
|
||||
avail: Availability,
|
||||
paused: bool,
|
||||
}
|
||||
|
||||
/// Array of u128 with every bit as marker for a worker handle's availability.
|
||||
#[derive(Debug, Default)]
|
||||
struct Availability([u128; 4]);
|
||||
|
||||
impl Default for Availability {
|
||||
fn default() -> Self {
|
||||
Self([0; 4])
|
||||
}
|
||||
}
|
||||
|
||||
impl Availability {
|
||||
/// Check if any worker handle is available
|
||||
#[inline(always)]
|
||||
@@ -158,7 +153,7 @@ impl Accept {
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
srv: Server,
|
||||
srv: ServerHandle,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
) {
|
||||
// Accept runs in its own thread and would want to spawn additional futures to current
|
||||
@@ -181,7 +176,7 @@ impl Accept {
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
srv: Server,
|
||||
srv: ServerHandle,
|
||||
) -> (Accept, Vec<ServerSocketInfo>) {
|
||||
let sockets = socks
|
||||
.into_iter()
|
||||
@@ -234,7 +229,7 @@ impl Accept {
|
||||
WAKER_TOKEN => {
|
||||
let exit = self.handle_waker(sockets);
|
||||
if exit {
|
||||
info!("Accept is stopped.");
|
||||
info!("Accept thread stopped");
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -370,14 +365,14 @@ impl Accept {
|
||||
|
||||
fn register_logged(&self, info: &mut ServerSocketInfo) {
|
||||
match self.register(info) {
|
||||
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
|
||||
Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()),
|
||||
Err(e) => error!("Can not register server socket {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
||||
match self.poll.registry().deregister(&mut info.lst) {
|
||||
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
|
||||
Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()),
|
||||
Err(e) => {
|
||||
error!("Can not deregister server socket {}", e)
|
||||
}
|
||||
|
@@ -1,4 +1,5 @@
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
io, mem,
|
||||
pin::Pin,
|
||||
@@ -7,21 +8,25 @@ use std::{
|
||||
};
|
||||
|
||||
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
|
||||
use actix_service::ServiceFactory;
|
||||
use log::{error, info};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver},
|
||||
oneshot,
|
||||
};
|
||||
|
||||
use crate::accept::AcceptLoop;
|
||||
use crate::join_all;
|
||||
use crate::server::{Server, ServerCommand};
|
||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
|
||||
use crate::{
|
||||
accept::AcceptLoop,
|
||||
join_all,
|
||||
server::{ServerCommand, ServerHandle},
|
||||
service::{ServerServiceFactory, StreamNewService},
|
||||
signals::{Signal, Signals},
|
||||
socket::{
|
||||
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
|
||||
},
|
||||
waker_queue::{WakerInterest, WakerQueue},
|
||||
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
|
||||
};
|
||||
|
||||
/// Server builder
|
||||
pub struct ServerBuilder {
|
||||
@@ -29,13 +34,13 @@ pub struct ServerBuilder {
|
||||
token: usize,
|
||||
backlog: u32,
|
||||
handles: Vec<(usize, WorkerHandleServer)>,
|
||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||
services: Vec<Box<dyn ServerServiceFactory>>,
|
||||
sockets: Vec<(usize, String, MioListener)>,
|
||||
accept: AcceptLoop,
|
||||
exit: bool,
|
||||
no_signals: bool,
|
||||
cmd: UnboundedReceiver<ServerCommand>,
|
||||
server: Server,
|
||||
server: ServerHandle,
|
||||
notify: Vec<oneshot::Sender<()>>,
|
||||
worker_config: ServerWorkerConfig,
|
||||
}
|
||||
@@ -50,7 +55,7 @@ impl ServerBuilder {
|
||||
/// Create new Server builder instance
|
||||
pub fn new() -> ServerBuilder {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
let server = Server::new(tx);
|
||||
let server = ServerHandle::new(tx);
|
||||
|
||||
ServerBuilder {
|
||||
threads: num_cpus::get(),
|
||||
@@ -114,15 +119,21 @@ impl ServerBuilder {
|
||||
|
||||
/// Sets the maximum per-worker number of concurrent connections.
|
||||
///
|
||||
/// All socket listeners will stop accepting connections when this limit is
|
||||
/// reached for each worker.
|
||||
/// All socket listeners will stop accepting connections when this limit is reached for
|
||||
/// each worker.
|
||||
///
|
||||
/// By default max connections is set to a 25k per worker.
|
||||
pub fn maxconn(mut self, num: usize) -> Self {
|
||||
/// By default max connections is set to a 25,600 per worker.
|
||||
pub fn max_concurrent_connections(mut self, num: usize) -> Self {
|
||||
self.worker_config.max_concurrent_connections(num);
|
||||
self
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
|
||||
pub fn maxconn(self, num: usize) -> Self {
|
||||
self.max_concurrent_connections(num)
|
||||
}
|
||||
|
||||
/// Stop Actix system.
|
||||
pub fn system_exit(mut self) -> Self {
|
||||
self.exit = true;
|
||||
@@ -147,91 +158,61 @@ impl ServerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Add new service to the server.
|
||||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
/// Bind server to socket addresses.
|
||||
///
|
||||
/// Binds to all network interface addresses that resolve from the `addr` argument.
|
||||
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
|
||||
/// interfaces at the same time by passing a list of socket addresses.
|
||||
///
|
||||
/// This fails only if all addresses fail to bind.
|
||||
pub fn bind<F, U, InitErr>(
|
||||
mut self,
|
||||
name: impl AsRef<str>,
|
||||
addr: U,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<TcpStream>,
|
||||
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
U: ToSocketAddrs,
|
||||
{
|
||||
let sockets = bind_addr(addr, self.backlog)?;
|
||||
|
||||
for lst in sockets {
|
||||
let token = self.next_token();
|
||||
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory.clone(),
|
||||
lst.local_addr()?,
|
||||
));
|
||||
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add new unix domain service to the server.
|
||||
#[cfg(unix)]
|
||||
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<actix_rt::net::UnixStream>,
|
||||
N: AsRef<str>,
|
||||
U: AsRef<std::path::Path>,
|
||||
{
|
||||
// The path must not exist when we try to bind.
|
||||
// Try to remove it to avoid bind error.
|
||||
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
|
||||
// NotFound is expected and not an issue. Anything else is.
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let lst = crate::socket::StdUnixListener::bind(addr)?;
|
||||
self.listen_uds(name, lst, factory)
|
||||
}
|
||||
|
||||
/// Add new unix domain service to the server.
|
||||
/// Useful when running as a systemd service and
|
||||
/// a socket FD can be acquired using the systemd crate.
|
||||
#[cfg(unix)]
|
||||
pub fn listen_uds<F, N: AsRef<str>>(
|
||||
/// Bind server to existing TCP listener.
|
||||
///
|
||||
/// Useful when running as a systemd service and a socket FD can be passed to the process.
|
||||
pub fn listen<F, InitErr>(
|
||||
mut self,
|
||||
name: N,
|
||||
lst: crate::socket::StdUnixListener,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<actix_rt::net::UnixStream>,
|
||||
{
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
lst.set_nonblocking(true)?;
|
||||
let token = self.next_token();
|
||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory,
|
||||
addr,
|
||||
));
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add new service to the server.
|
||||
pub fn listen<F, N: AsRef<str>>(
|
||||
mut self,
|
||||
name: N,
|
||||
name: impl AsRef<str>,
|
||||
lst: StdTcpListener,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<TcpStream>,
|
||||
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
{
|
||||
lst.set_nonblocking(true)?;
|
||||
let addr = lst.local_addr()?;
|
||||
|
||||
let addr = lst.local_addr()?;
|
||||
let token = self.next_token();
|
||||
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
@@ -246,11 +227,18 @@ impl ServerBuilder {
|
||||
}
|
||||
|
||||
/// Starts processing incoming connections and return server controller.
|
||||
pub fn run(mut self) -> Server {
|
||||
pub fn run(mut self) -> ServerHandle {
|
||||
if self.sockets.is_empty() {
|
||||
panic!("Server should have at least one bound socket");
|
||||
} else {
|
||||
info!("Starting {} workers", self.threads);
|
||||
for (_, name, lst) in &self.sockets {
|
||||
info!(
|
||||
r#"Starting service: "{}", workers: {}, listening on: {}"#,
|
||||
name,
|
||||
self.threads,
|
||||
lst.local_addr()
|
||||
);
|
||||
}
|
||||
|
||||
// start workers
|
||||
let handles = (0..self.threads)
|
||||
@@ -264,9 +252,6 @@ impl ServerBuilder {
|
||||
.collect();
|
||||
|
||||
// start accept thread
|
||||
for sock in &self.sockets {
|
||||
info!("Starting \"{}\" service on {}", sock.1, sock.2);
|
||||
}
|
||||
self.accept.start(
|
||||
mem::take(&mut self.sockets)
|
||||
.into_iter()
|
||||
@@ -280,7 +265,7 @@ impl ServerBuilder {
|
||||
Signals::start(self.server.clone());
|
||||
}
|
||||
|
||||
// start http server actor
|
||||
// start http server
|
||||
let server = self.server.clone();
|
||||
rt::spawn(self);
|
||||
server
|
||||
@@ -312,23 +297,25 @@ impl ServerBuilder {
|
||||
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
|
||||
match sig {
|
||||
Signal::Int => {
|
||||
info!("SIGINT received, exiting");
|
||||
info!("SIGINT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Term => {
|
||||
info!("SIGTERM received, stopping");
|
||||
info!("SIGTERM received; starting graceful shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: true,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Quit => {
|
||||
info!("SIGQUIT received, exiting");
|
||||
info!("SIGQUIT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
@@ -359,12 +346,14 @@ impl ServerBuilder {
|
||||
|
||||
rt::spawn(async move {
|
||||
if graceful {
|
||||
// wait for all workers to shut down
|
||||
let _ = join_all(stop).await;
|
||||
}
|
||||
|
||||
if let Some(tx) = completion {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
|
||||
for tx in notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
@@ -386,7 +375,7 @@ impl ServerBuilder {
|
||||
}
|
||||
|
||||
if found {
|
||||
error!("Worker has died {:?}, restarting", idx);
|
||||
error!("Worker {} has died; restarting", idx);
|
||||
|
||||
let mut new_idx = self.handles.len();
|
||||
'found: loop {
|
||||
@@ -415,6 +404,74 @@ impl ServerBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Unix Domain Socket (UDS) support.
|
||||
#[cfg(unix)]
|
||||
impl ServerBuilder {
|
||||
/// Add new unix domain service to the server.
|
||||
pub fn bind_uds<F, U, InitErr>(
|
||||
self,
|
||||
name: impl AsRef<str>,
|
||||
addr: U,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
|
||||
+ Send
|
||||
+ Clone
|
||||
+ 'static,
|
||||
U: AsRef<std::path::Path>,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
{
|
||||
// The path must not exist when we try to bind.
|
||||
// Try to remove it to avoid bind error.
|
||||
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
|
||||
// NotFound is expected and not an issue. Anything else is.
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let lst = crate::socket::StdUnixListener::bind(addr)?;
|
||||
self.listen_uds(name, lst, factory)
|
||||
}
|
||||
|
||||
/// Add new unix domain service to the server.
|
||||
///
|
||||
/// Useful when running as a systemd service and a socket FD can be passed to the process.
|
||||
pub fn listen_uds<F, InitErr>(
|
||||
mut self,
|
||||
name: impl AsRef<str>,
|
||||
lst: crate::socket::StdUnixListener,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
|
||||
+ Send
|
||||
+ Clone
|
||||
+ 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
{
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
|
||||
lst.set_nonblocking(true)?;
|
||||
|
||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
let token = self.next_token();
|
||||
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory,
|
||||
addr,
|
||||
));
|
||||
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for ServerBuilder {
|
||||
type Output = ();
|
||||
|
||||
@@ -433,29 +490,28 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
|
||||
backlog: u32,
|
||||
) -> io::Result<Vec<MioTcpListener>> {
|
||||
let mut err = None;
|
||||
let mut succ = false;
|
||||
let mut success = false;
|
||||
let mut sockets = Vec::new();
|
||||
|
||||
for addr in addr.to_socket_addrs()? {
|
||||
match create_tcp_listener(addr, backlog) {
|
||||
Ok(lst) => {
|
||||
succ = true;
|
||||
success = true;
|
||||
sockets.push(lst);
|
||||
}
|
||||
Err(e) => err = Some(e),
|
||||
}
|
||||
}
|
||||
|
||||
if !succ {
|
||||
if let Some(e) = err.take() {
|
||||
Err(e)
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Can not bind to address.",
|
||||
))
|
||||
}
|
||||
} else {
|
||||
if success {
|
||||
Ok(sockets)
|
||||
} else if let Some(err) = err.take() {
|
||||
Err(err)
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"Can not bind to socket address",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -15,8 +15,7 @@ mod waker_queue;
|
||||
mod worker;
|
||||
|
||||
pub use self::builder::ServerBuilder;
|
||||
pub use self::server::Server;
|
||||
pub use self::service::ServiceFactory;
|
||||
pub use self::server::{Server, ServerHandle};
|
||||
pub use self::test_server::TestServer;
|
||||
|
||||
#[doc(hidden)]
|
||||
|
@@ -6,8 +6,18 @@ use std::task::{Context, Poll};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::builder::ServerBuilder;
|
||||
use crate::signals::Signal;
|
||||
use crate::{signals::Signal, ServerBuilder};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct Server;
|
||||
|
||||
impl Server {
|
||||
/// Start server building process.
|
||||
pub fn build() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ServerCommand {
|
||||
@@ -15,8 +25,8 @@ pub(crate) enum ServerCommand {
|
||||
Pause(oneshot::Sender<()>),
|
||||
Resume(oneshot::Sender<()>),
|
||||
Signal(Signal),
|
||||
/// Whether to try and shut down gracefully
|
||||
Stop {
|
||||
/// True if shut down should be graceful.
|
||||
graceful: bool,
|
||||
completion: Option<oneshot::Sender<()>>,
|
||||
},
|
||||
@@ -24,20 +34,22 @@ pub(crate) enum ServerCommand {
|
||||
Notify(oneshot::Sender<()>),
|
||||
}
|
||||
|
||||
/// Server handle.
|
||||
///
|
||||
/// # Shutdown Signals
|
||||
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
|
||||
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
|
||||
///
|
||||
/// A graceful shutdown will wait for all workers to stop first.
|
||||
#[derive(Debug)]
|
||||
pub struct Server(
|
||||
pub struct ServerHandle(
|
||||
UnboundedSender<ServerCommand>,
|
||||
Option<oneshot::Receiver<()>>,
|
||||
);
|
||||
|
||||
impl Server {
|
||||
impl ServerHandle {
|
||||
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
|
||||
Server(tx, None)
|
||||
}
|
||||
|
||||
/// Start server building process
|
||||
pub fn build() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
ServerHandle(tx, None)
|
||||
}
|
||||
|
||||
pub(crate) fn signal(&self, sig: Signal) {
|
||||
@@ -84,13 +96,13 @@ impl Server {
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Server {
|
||||
impl Clone for ServerHandle {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0.clone(), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Server {
|
||||
impl Future for ServerHandle {
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
|
@@ -1,26 +1,26 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
fmt,
|
||||
marker::PhantomData,
|
||||
net::SocketAddr,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::future::{ready, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::error;
|
||||
|
||||
use crate::socket::{FromStream, MioStream};
|
||||
use crate::worker::WorkerCounterGuard;
|
||||
use crate::{
|
||||
socket::{FromStream, MioStream},
|
||||
worker::WorkerCounterGuard,
|
||||
};
|
||||
|
||||
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||
type Factory: BaseServiceFactory<Stream, Config = ()>;
|
||||
|
||||
fn create(&self) -> Self::Factory;
|
||||
}
|
||||
|
||||
pub(crate) trait InternalServiceFactory: Send {
|
||||
pub(crate) trait ServerServiceFactory: Send {
|
||||
fn name(&self, token: usize) -> &str;
|
||||
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
||||
fn clone_factory(&self) -> Box<dyn ServerServiceFactory>;
|
||||
|
||||
/// Initialize Mio stream handler service and return it with its service token.
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ where
|
||||
{
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = Ready<Result<(), ()>>;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(ctx).map_err(|_| ())
|
||||
@@ -72,25 +72,26 @@ where
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Can not convert to an async tcp stream: {}", e);
|
||||
Err(err) => {
|
||||
error!("Can not convert Mio stream to an async TCP stream: {}", err);
|
||||
Err(())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
|
||||
pub(crate) struct StreamNewService<F, Io, InitErr> {
|
||||
name: String,
|
||||
inner: F,
|
||||
token: usize,
|
||||
addr: SocketAddr,
|
||||
_t: PhantomData<Io>,
|
||||
_t: PhantomData<(Io, InitErr)>,
|
||||
}
|
||||
|
||||
impl<F, Io> StreamNewService<F, Io>
|
||||
impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
|
||||
where
|
||||
F: ServiceFactory<Io>,
|
||||
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
Io: FromStream + Send + 'static,
|
||||
{
|
||||
pub(crate) fn create(
|
||||
@@ -98,7 +99,7 @@ where
|
||||
token: usize,
|
||||
inner: F,
|
||||
addr: SocketAddr,
|
||||
) -> Box<dyn InternalServiceFactory> {
|
||||
) -> Box<dyn ServerServiceFactory> {
|
||||
Box::new(Self {
|
||||
name,
|
||||
token,
|
||||
@@ -109,16 +110,17 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
|
||||
impl<F, Io, InitErr> ServerServiceFactory for StreamNewService<F, Io, InitErr>
|
||||
where
|
||||
F: ServiceFactory<Io>,
|
||||
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
Io: FromStream + Send + 'static,
|
||||
{
|
||||
fn name(&self, _: usize) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
||||
fn clone_factory(&self) -> Box<dyn ServerServiceFactory> {
|
||||
Box::new(Self {
|
||||
name: self.name.clone(),
|
||||
inner: self.inner.clone(),
|
||||
@@ -130,28 +132,18 @@ where
|
||||
|
||||
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
|
||||
let token = self.token;
|
||||
let fut = self.inner.create().new_service(());
|
||||
let fut = self.inner.new_service(());
|
||||
Box::pin(async move {
|
||||
match fut.await {
|
||||
Ok(inner) => {
|
||||
let service = Box::new(StreamService::new(inner)) as _;
|
||||
Ok(svc) => {
|
||||
let service = Box::new(StreamService::new(svc)) as _;
|
||||
Ok((token, service))
|
||||
}
|
||||
Err(_) => Err(()),
|
||||
Err(err) => {
|
||||
error!("{:?}", err);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T, I> ServiceFactory<I> for F
|
||||
where
|
||||
F: Fn() -> T + Send + Clone + 'static,
|
||||
T: BaseServiceFactory<I, Config = ()>,
|
||||
I: FromStream,
|
||||
{
|
||||
type Factory = T;
|
||||
|
||||
fn create(&self) -> T {
|
||||
(self)()
|
||||
}
|
||||
}
|
||||
|
@@ -2,30 +2,36 @@ use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::server::ServerHandle;
|
||||
|
||||
/// Different types of process signals
|
||||
/// Types of process signals.
|
||||
#[allow(dead_code)]
|
||||
#[derive(PartialEq, Clone, Copy, Debug)]
|
||||
pub(crate) enum Signal {
|
||||
/// SIGINT
|
||||
/// `SIGINT`
|
||||
Int,
|
||||
/// SIGTERM
|
||||
|
||||
/// `SIGTERM`
|
||||
Term,
|
||||
/// SIGQUIT
|
||||
|
||||
/// `SIGQUIT`
|
||||
Quit,
|
||||
}
|
||||
|
||||
/// Process signal listener.
|
||||
pub(crate) struct Signals {
|
||||
srv: Server,
|
||||
srv: ServerHandle,
|
||||
|
||||
#[cfg(not(unix))]
|
||||
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
|
||||
|
||||
#[cfg(unix)]
|
||||
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
|
||||
}
|
||||
|
||||
impl Signals {
|
||||
pub(crate) fn start(srv: Server) {
|
||||
/// Spawns a signal listening future that is able to send commands to the `Server`.
|
||||
pub(crate) fn start(srv: ServerHandle) {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
actix_rt::spawn(Signals {
|
||||
@@ -33,6 +39,7 @@ impl Signals {
|
||||
signals: Box::pin(actix_rt::signal::ctrl_c()),
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use actix_rt::signal::unix;
|
||||
@@ -76,6 +83,7 @@ impl Future for Signals {
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
for (sig, fut) in self.signals.iter_mut() {
|
||||
|
@@ -1,25 +1,24 @@
|
||||
use std::sync::mpsc;
|
||||
use std::{net, thread};
|
||||
use std::{fmt, net, sync::mpsc, thread};
|
||||
|
||||
use actix_rt::{net::TcpStream, System};
|
||||
use actix_service::ServiceFactory;
|
||||
|
||||
use crate::{Server, ServerBuilder, ServiceFactory};
|
||||
use crate::{Server, ServerBuilder};
|
||||
|
||||
/// The `TestServer` type.
|
||||
/// A testing server.
|
||||
///
|
||||
/// `TestServer` is very simple test server that simplify process of writing
|
||||
/// integration tests for actix-net applications.
|
||||
/// `TestServer` is very simple test server that simplify process of writing integration tests for
|
||||
/// network applications.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use actix_service::fn_service;
|
||||
/// use actix_server::TestServer;
|
||||
/// use actix_service::fn_service;
|
||||
///
|
||||
/// #[actix_rt::main]
|
||||
/// async fn main() {
|
||||
/// let srv = TestServer::with(|| fn_service(
|
||||
/// |sock| async move {
|
||||
/// let srv = TestServer::with(fn_service(|sock|
|
||||
/// async move {
|
||||
/// println!("New connection: {:?}", sock);
|
||||
/// Ok::<_, ()>(())
|
||||
/// }
|
||||
@@ -28,9 +27,10 @@ use crate::{Server, ServerBuilder, ServiceFactory};
|
||||
/// println!("SOCKET: {:?}", srv.connect());
|
||||
/// }
|
||||
/// ```
|
||||
#[non_exhaustive]
|
||||
pub struct TestServer;
|
||||
|
||||
/// Test server runtime
|
||||
/// Test server runtime.
|
||||
pub struct TestServerRuntime {
|
||||
addr: net::SocketAddr,
|
||||
host: String,
|
||||
@@ -39,7 +39,7 @@ pub struct TestServerRuntime {
|
||||
}
|
||||
|
||||
impl TestServer {
|
||||
/// Start new server with server builder
|
||||
/// Start new server using server builder.
|
||||
pub fn start<F>(mut factory: F) -> TestServerRuntime
|
||||
where
|
||||
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
|
||||
@@ -64,8 +64,12 @@ impl TestServer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start new test server with application factory
|
||||
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
|
||||
/// Start new test server with default settings using application factory.
|
||||
pub fn with<F, InitErr>(factory: F) -> TestServerRuntime
|
||||
where
|
||||
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
|
||||
InitErr: fmt::Debug + Send + 'static,
|
||||
{
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
// run server in separate thread
|
||||
@@ -99,7 +103,7 @@ impl TestServer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Get first available unused local address
|
||||
/// Get first available unused local address.
|
||||
pub fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = mio::net::TcpSocket::new_v4().unwrap();
|
||||
@@ -111,27 +115,27 @@ impl TestServer {
|
||||
}
|
||||
|
||||
impl TestServerRuntime {
|
||||
/// Test server host
|
||||
/// Test server host.
|
||||
pub fn host(&self) -> &str {
|
||||
&self.host
|
||||
}
|
||||
|
||||
/// Test server port
|
||||
/// Test server port.
|
||||
pub fn port(&self) -> u16 {
|
||||
self.port
|
||||
}
|
||||
|
||||
/// Get test server address
|
||||
/// Get test server address.
|
||||
pub fn addr(&self) -> net::SocketAddr {
|
||||
self.addr
|
||||
}
|
||||
|
||||
/// Stop http server
|
||||
/// Stop server.
|
||||
fn stop(&mut self) {
|
||||
self.system.stop();
|
||||
}
|
||||
|
||||
/// Connect to server, return tokio TcpStream
|
||||
/// Connect to server, returning a Tokio `TcpStream`.
|
||||
pub fn connect(&self) -> std::io::Result<TcpStream> {
|
||||
TcpStream::from_std(net::TcpStream::connect(self.addr)?)
|
||||
}
|
||||
|
@@ -24,10 +24,12 @@ use tokio::sync::{
|
||||
};
|
||||
|
||||
use crate::join_all;
|
||||
use crate::service::{BoxedServerService, InternalServiceFactory};
|
||||
use crate::service::{BoxedServerService, ServerServiceFactory};
|
||||
use crate::socket::MioStream;
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
|
||||
const DEFAULT_SHUTDOWN_DURATION: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Stop worker message. Returns `true` on successful graceful shutdown.
|
||||
/// and `false` if some connections still alive when shutdown execute.
|
||||
pub(crate) struct Stop {
|
||||
@@ -181,6 +183,7 @@ impl WorkerHandleAccept {
|
||||
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WorkerHandleServer {
|
||||
#[allow(dead_code)]
|
||||
idx: usize,
|
||||
tx: UnboundedSender<Stop>,
|
||||
}
|
||||
@@ -195,7 +198,7 @@ impl WorkerHandleServer {
|
||||
|
||||
/// Service worker.
|
||||
///
|
||||
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
||||
/// Worker accepts socket objects via unbounded channel and starts stream processing.
|
||||
pub(crate) struct ServerWorker {
|
||||
// UnboundedReceiver<Conn> should always be the first field.
|
||||
// It must be dropped as soon as ServerWorker dropping.
|
||||
@@ -203,7 +206,7 @@ pub(crate) struct ServerWorker {
|
||||
rx2: UnboundedReceiver<Stop>,
|
||||
counter: WorkerCounter,
|
||||
services: Box<[WorkerService]>,
|
||||
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
||||
factories: Box<[Box<dyn ServerServiceFactory>]>,
|
||||
state: WorkerState,
|
||||
shutdown_timeout: Duration,
|
||||
}
|
||||
@@ -243,10 +246,11 @@ impl Default for ServerWorkerConfig {
|
||||
fn default() -> Self {
|
||||
// 512 is the default max blocking thread count of tokio runtime.
|
||||
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
|
||||
|
||||
Self {
|
||||
shutdown_timeout: Duration::from_secs(30),
|
||||
shutdown_timeout: DEFAULT_SHUTDOWN_DURATION,
|
||||
max_blocking_threads,
|
||||
max_concurrent_connections: 25600,
|
||||
max_concurrent_connections: 25_600,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -268,7 +272,7 @@ impl ServerWorkerConfig {
|
||||
impl ServerWorker {
|
||||
pub(crate) fn start(
|
||||
idx: usize,
|
||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
factories: Vec<Box<dyn ServerServiceFactory>>,
|
||||
waker_queue: WakerQueue,
|
||||
config: ServerWorkerConfig,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
@@ -313,6 +317,7 @@ impl ServerWorker {
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>();
|
||||
|
||||
let services = match res {
|
||||
Ok(res) => res
|
||||
.into_iter()
|
||||
@@ -326,8 +331,9 @@ impl ServerWorker {
|
||||
services
|
||||
})
|
||||
.into_boxed_slice(),
|
||||
Err(e) => {
|
||||
error!("Can not start worker: {:?}", e);
|
||||
|
||||
Err(err) => {
|
||||
error!("Can not start worker: {:?}", err);
|
||||
Arbiter::current().stop();
|
||||
return;
|
||||
}
|
||||
@@ -429,13 +435,15 @@ struct Restart {
|
||||
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
|
||||
}
|
||||
|
||||
// Shutdown keep states necessary for server shutdown:
|
||||
// Sleep for interval check the shutdown progress.
|
||||
// Instant for the start time of shutdown.
|
||||
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
|
||||
/// State necessary for server shutdown.
|
||||
struct Shutdown {
|
||||
// Interval for checking the shutdown progress.
|
||||
timer: Pin<Box<Sleep>>,
|
||||
|
||||
/// Start time of shutdown.
|
||||
start_from: Instant,
|
||||
|
||||
/// Notify of the shutdown outcome (force/grace) to stop caller.
|
||||
tx: oneshot::Sender<bool>,
|
||||
}
|
||||
|
||||
@@ -463,11 +471,11 @@ impl Future for ServerWorker {
|
||||
{
|
||||
let num = this.counter.total();
|
||||
if num == 0 {
|
||||
info!("Shutting down worker, 0 connections");
|
||||
info!("Shutting down idle worker");
|
||||
let _ = tx.send(true);
|
||||
return Poll::Ready(());
|
||||
} else if graceful {
|
||||
info!("Graceful worker shutdown, {} connections", num);
|
||||
info!("Graceful worker shutdown; finishing {} connections", num);
|
||||
this.shutdown(false);
|
||||
|
||||
this.state = WorkerState::Shutdown(Shutdown {
|
||||
@@ -476,7 +484,7 @@ impl Future for ServerWorker {
|
||||
tx,
|
||||
});
|
||||
} else {
|
||||
info!("Force shutdown worker, {} connections", num);
|
||||
info!("Force shutdown worker, closing {} connections", num);
|
||||
this.shutdown(true);
|
||||
|
||||
let _ = tx.send(false);
|
||||
@@ -521,23 +529,25 @@ impl Future for ServerWorker {
|
||||
self.poll(cx)
|
||||
}
|
||||
WorkerState::Shutdown(ref mut shutdown) => {
|
||||
// Wait for 1 second.
|
||||
// wait for 1 second
|
||||
ready!(shutdown.timer.as_mut().poll(cx));
|
||||
|
||||
if this.counter.total() == 0 {
|
||||
// Graceful shutdown.
|
||||
// graceful shutdown
|
||||
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
||||
let _ = shutdown.tx.send(true);
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
|
||||
// Timeout forceful shutdown.
|
||||
// timeout forceful shutdown
|
||||
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
||||
let _ = shutdown.tx.send(false);
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
// Reset timer and wait for 1 second.
|
||||
// reset timer and wait for 1 second
|
||||
let time = Instant::now() + Duration::from_secs(1);
|
||||
shutdown.timer.as_mut().reset(time);
|
||||
shutdown.timer.as_mut().poll(cx)
|
||||
|
@@ -5,8 +5,6 @@ use std::{net, thread, time::Duration};
|
||||
use actix_rt::{net::TcpStream, time::sleep};
|
||||
use actix_server::Server;
|
||||
use actix_service::fn_service;
|
||||
use actix_utils::future::ok;
|
||||
use futures_util::future::lazy;
|
||||
|
||||
fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
@@ -23,25 +21,26 @@ fn test_bind() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let sys = actix_rt::System::new();
|
||||
let srv = sys.block_on(lazy(|_| {
|
||||
Server::build()
|
||||
actix_rt::System::new().block_on(async {
|
||||
let srv = Server::build()
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
|
||||
.unwrap()
|
||||
.run()
|
||||
}));
|
||||
.bind("test", addr, fn_service(|_| async { Ok::<_, ()>(()) }))?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((srv, actix_rt::System::current()));
|
||||
let _ = sys.run();
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
let (_, sys) = rx.recv().unwrap();
|
||||
let (srv, sys) = rx.recv().unwrap();
|
||||
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
assert!(net::TcpStream::connect(addr).is_ok());
|
||||
|
||||
let _ = srv.stop(true);
|
||||
sys.stop();
|
||||
let _ = h.join();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -50,25 +49,28 @@ fn test_listen() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let sys = actix_rt::System::new();
|
||||
let lst = net::TcpListener::bind(addr).unwrap();
|
||||
sys.block_on(async {
|
||||
Server::build()
|
||||
let lst = net::TcpListener::bind(addr)?;
|
||||
actix_rt::System::new().block_on(async {
|
||||
let srv = Server::build()
|
||||
.disable_signals()
|
||||
.workers(1)
|
||||
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
|
||||
.unwrap()
|
||||
.listen("test", lst, fn_service(|_| async { Ok::<_, ()>(()) }))?
|
||||
.run();
|
||||
let _ = tx.send(actix_rt::System::current());
|
||||
});
|
||||
let _ = sys.run();
|
||||
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
let sys = rx.recv().unwrap();
|
||||
|
||||
let (srv, sys) = rx.recv().unwrap();
|
||||
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
assert!(net::TcpStream::connect(addr).is_ok());
|
||||
|
||||
let _ = srv.stop(true);
|
||||
sys.stop();
|
||||
let _ = h.join();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -84,24 +86,25 @@ fn test_start() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let sys = actix_rt::System::new();
|
||||
let srv = sys.block_on(lazy(|_| {
|
||||
Server::build()
|
||||
actix_rt::System::new().block_on(async {
|
||||
let srv = Server::build()
|
||||
.backlog(100)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || {
|
||||
.bind(
|
||||
"test",
|
||||
addr,
|
||||
fn_service(|io: TcpStream| async move {
|
||||
let mut f = Framed::new(io, BytesCodec);
|
||||
f.send(Bytes::from_static(b"test")).await.unwrap();
|
||||
Ok::<_, ()>(())
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
.run()
|
||||
}));
|
||||
}),
|
||||
)?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((srv, actix_rt::System::current()));
|
||||
let _ = sys.run();
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
|
||||
let (srv, sys) = rx.recv().unwrap();
|
||||
@@ -134,12 +137,11 @@ fn test_start() {
|
||||
|
||||
// stop
|
||||
let _ = srv.stop(false);
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
assert!(net::TcpStream::connect(addr).is_err());
|
||||
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
sys.stop();
|
||||
let _ = h.join();
|
||||
h.join().unwrap().unwrap();
|
||||
|
||||
thread::sleep(Duration::from_secs(1));
|
||||
assert!(net::TcpStream::connect(addr).is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
@@ -166,10 +168,10 @@ async fn test_max_concurrent_connections() {
|
||||
// Set a relative higher backlog.
|
||||
.backlog(12)
|
||||
// max connection for a worker is 3.
|
||||
.maxconn(max_conn)
|
||||
.max_concurrent_connections(max_conn)
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || {
|
||||
.bind("test", addr, {
|
||||
let counter = counter.clone();
|
||||
fn_service(move |_io: TcpStream| {
|
||||
let counter = counter.clone();
|
||||
@@ -209,9 +211,8 @@ async fn test_max_concurrent_connections() {
|
||||
}
|
||||
|
||||
srv.stop(false).await;
|
||||
|
||||
sys.stop();
|
||||
let _ = h.join().unwrap();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
@@ -260,22 +261,20 @@ async fn test_service_restart() {
|
||||
let server = Server::build()
|
||||
.backlog(1)
|
||||
.disable_signals()
|
||||
.bind("addr1", addr1, move || {
|
||||
.bind("addr1", addr1, {
|
||||
let num = num.clone();
|
||||
fn_factory(move || {
|
||||
let num = num.clone();
|
||||
async move { Ok::<_, ()>(TestService(num)) }
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
.bind("addr2", addr2, move || {
|
||||
})?
|
||||
.bind("addr2", addr2, {
|
||||
let num2 = num2.clone();
|
||||
fn_factory(move || {
|
||||
let num2 = num2.clone();
|
||||
async move { Ok::<_, ()>(TestService(num2)) }
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
})?
|
||||
.workers(1)
|
||||
.run();
|
||||
|
||||
@@ -306,9 +305,9 @@ async fn test_service_restart() {
|
||||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||
|
||||
sys.stop();
|
||||
let _ = server.stop(false);
|
||||
let _ = h.join().unwrap();
|
||||
sys.stop();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
@@ -318,6 +317,7 @@ async fn worker_restart() {
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct TestServiceFactory(Arc<AtomicUsize>);
|
||||
|
||||
impl ServiceFactory<TcpStream> for TestServiceFactory {
|
||||
@@ -380,12 +380,12 @@ async fn worker_restart() {
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
.disable_signals()
|
||||
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
|
||||
.unwrap()
|
||||
.bind("addr", addr, TestServiceFactory(counter.clone()))?
|
||||
.workers(2)
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
|
||||
server.await
|
||||
})
|
||||
});
|
||||
@@ -447,7 +447,7 @@ async fn worker_restart() {
|
||||
assert_eq!("3", id);
|
||||
stream.shutdown().await.unwrap();
|
||||
|
||||
sys.stop();
|
||||
let _ = server.stop(false);
|
||||
let _ = h.join().unwrap();
|
||||
sys.stop();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
@@ -1,6 +1,9 @@
|
||||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* `fn_factory[_with_config]` types now impl `Send` even when config, service, request types do not. [#403]
|
||||
|
||||
[#403]: https://github.com/actix/actix-net/pull/403
|
||||
|
||||
|
||||
## 2.0.1 - 2021-10-11
|
||||
|
33
actix-service/examples/clone.rs
Normal file
33
actix-service/examples/clone.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::{future::Future, sync::mpsc, time::Duration};
|
||||
|
||||
async fn oracle<F, Fut>(f: F) -> (u32, u32)
|
||||
where
|
||||
F: FnOnce() -> Fut + Clone + Send + 'static,
|
||||
Fut: Future<Output = u32> + 'static,
|
||||
{
|
||||
let f1 = actix_rt::spawn(f.clone()());
|
||||
let f2 = actix_rt::spawn(f());
|
||||
|
||||
(f1.await.unwrap(), f2.await.unwrap())
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
|
||||
let (r1, r2) = oracle({
|
||||
let tx = tx.clone();
|
||||
|
||||
|| async move {
|
||||
tx.send(()).unwrap();
|
||||
4 * 4
|
||||
}
|
||||
})
|
||||
.await;
|
||||
assert_eq!(r1, r2);
|
||||
|
||||
tx.send(()).unwrap();
|
||||
|
||||
rx.recv_timeout(Duration::from_millis(100)).unwrap();
|
||||
rx.recv_timeout(Duration::from_millis(100)).unwrap();
|
||||
}
|
@@ -14,7 +14,7 @@ use super::{Service, ServiceFactory};
|
||||
/// Service for the `and_then` combinator, chaining a computation onto the end of another service
|
||||
/// which completes successfully.
|
||||
///
|
||||
/// This is created by the `Pipeline::and_then` method.
|
||||
/// Created by the `.and_then()` combinator.
|
||||
pub struct AndThenService<A, B, Req>(Rc<(A, B)>, PhantomData<Req>);
|
||||
|
||||
impl<A, B, Req> AndThenService<A, B, Req> {
|
||||
@@ -116,7 +116,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// `.and_then()` service factory combinator
|
||||
/// Service factory created by the `.and_then()` combinator.
|
||||
pub struct AndThenServiceFactory<A, B, Req>
|
||||
where
|
||||
A: ServiceFactory<Req>,
|
||||
|
@@ -63,7 +63,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to NewService\
|
||||
/// Convert `Fn(Config, &Server) -> Future<Service>` fn to ServiceFactory.
|
||||
struct ApplyConfigService<S1, Req, F, Cfg, Fut, S2, Err>
|
||||
where
|
||||
S1: Service<Req>,
|
||||
|
@@ -44,7 +44,7 @@ pub trait ServiceExt<Req>: Service<Req> {
|
||||
/// Call another service after call to this one has resolved successfully.
|
||||
///
|
||||
/// This function can be used to chain two services together and ensure that the second service
|
||||
/// isn't called until call to the fist service have finished. Result of the call to the first
|
||||
/// isn't called until call to the fist service has resolved. Result of the call to the first
|
||||
/// service is used as an input parameter for the second service's call.
|
||||
///
|
||||
/// Note that this function consumes the receiving service and returns a wrapped version of it.
|
||||
|
@@ -3,6 +3,7 @@ use core::{future::Future, marker::PhantomData};
|
||||
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
|
||||
|
||||
/// Create `ServiceFactory` for function that can act as a `Service`
|
||||
// TODO: remove unnecessary Cfg type param
|
||||
pub fn fn_service<F, Fut, Req, Res, Err, Cfg>(
|
||||
f: F,
|
||||
) -> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
@@ -48,13 +49,14 @@ where
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
// TODO: remove unnecessary Cfg type param
|
||||
pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
|
||||
f: F,
|
||||
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
||||
where
|
||||
Srv: Service<Req>,
|
||||
F: Fn() -> Fut,
|
||||
Fut: Future<Output = Result<Srv, Err>>,
|
||||
Srv: Service<Req>,
|
||||
{
|
||||
FnServiceNoConfig::new(f)
|
||||
}
|
||||
@@ -160,7 +162,7 @@ where
|
||||
Fut: Future<Output = Result<Res, Err>>,
|
||||
{
|
||||
f: F,
|
||||
_t: PhantomData<(Req, Cfg)>,
|
||||
_t: PhantomData<fn(Cfg, Req)>,
|
||||
}
|
||||
|
||||
impl<F, Fut, Req, Res, Err, Cfg> FnServiceFactory<F, Fut, Req, Res, Err, Cfg>
|
||||
@@ -237,7 +239,7 @@ where
|
||||
Srv: Service<Req>,
|
||||
{
|
||||
f: F,
|
||||
_t: PhantomData<(Fut, Cfg, Req, Srv, Err)>,
|
||||
_t: PhantomData<fn(Cfg, Req)>,
|
||||
}
|
||||
|
||||
impl<F, Fut, Cfg, Srv, Req, Err> FnServiceConfig<F, Fut, Cfg, Srv, Req, Err>
|
||||
@@ -293,7 +295,7 @@ where
|
||||
Fut: Future<Output = Result<Srv, Err>>,
|
||||
{
|
||||
f: F,
|
||||
_t: PhantomData<(Cfg, Req)>,
|
||||
_t: PhantomData<fn(Cfg, Req)>,
|
||||
}
|
||||
|
||||
impl<F, Cfg, Srv, Req, Fut, Err> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
|
||||
@@ -353,10 +355,11 @@ where
|
||||
mod tests {
|
||||
use core::task::Poll;
|
||||
|
||||
use alloc::rc::Rc;
|
||||
use futures_util::future::lazy;
|
||||
|
||||
use super::*;
|
||||
use crate::{ok, Service, ServiceFactory};
|
||||
use crate::{boxed, ok, Service, ServiceExt, ServiceFactory, ServiceFactoryExt};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_fn_service() {
|
||||
@@ -391,4 +394,142 @@ mod tests {
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(res.unwrap(), ("srv", 1));
|
||||
}
|
||||
|
||||
// these three properties of a service factory are usually important
|
||||
fn is_static<T: 'static>(_t: &T) {}
|
||||
fn impls_clone<T: Clone>(_t: &T) {}
|
||||
fn impls_send<T: Send>(_t: &T) {}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_fn_factory_impl_send() {
|
||||
let svc_fac = fn_factory_with_config(|cfg: usize| {
|
||||
ok::<_, ()>(fn_service(move |()| ok::<_, ()>(("srv", cfg))))
|
||||
});
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
|
||||
// Cfg type is explicitly !Send
|
||||
let svc_fac = fn_factory_with_config(|cfg: Rc<usize>| {
|
||||
let cfg = Rc::clone(&cfg);
|
||||
ok::<_, ()>(fn_service(move |_: ()| ok::<_, ()>(("srv", *cfg))))
|
||||
});
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
|
||||
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||
ok::<_, ()>(fn_service(move |()| ok::<_, ()>("srv")))
|
||||
});
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
|
||||
// Req type is explicitly !Send
|
||||
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||
ok::<_, ()>(fn_service(move |_: Rc<()>| ok::<_, ()>("srv")))
|
||||
});
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
|
||||
// Service type is explicitly !Send
|
||||
let svc_fac = fn_factory::<_, (), _, _, _, _>(|| {
|
||||
ok::<_, ()>(boxed::rc_service(fn_service(move |_: ()| {
|
||||
ok::<_, ()>("srv")
|
||||
})))
|
||||
});
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_service_combinators_impls() {
|
||||
#[derive(Clone)]
|
||||
struct Ident;
|
||||
|
||||
impl<T: 'static> Service<T> for Ident {
|
||||
type Response = T;
|
||||
type Error = ();
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
crate::always_ready!();
|
||||
|
||||
fn call(&self, req: T) -> Self::Future {
|
||||
ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
let svc = Ident;
|
||||
is_static(&svc);
|
||||
impls_clone(&svc);
|
||||
impls_send(&svc);
|
||||
|
||||
let svc = ServiceExt::map(Ident, core::convert::identity);
|
||||
impls_send(&svc);
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
let svc = ServiceExt::map_err(Ident, core::convert::identity);
|
||||
impls_send(&svc);
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
let svc = ServiceExt::and_then(Ident, Ident);
|
||||
// impls_send(&svc); // fails to compile :(
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
// let svc = ServiceExt::and_then_send(Ident, Ident);
|
||||
// impls_send(&svc);
|
||||
// svc.call(()).await.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_factory_combinators_impls() {
|
||||
#[derive(Clone)]
|
||||
struct Ident;
|
||||
|
||||
impl<T: 'static> ServiceFactory<T> for Ident {
|
||||
type Response = T;
|
||||
type Error = ();
|
||||
type Config = ();
|
||||
// explicitly !Send result service
|
||||
type Service = boxed::RcService<T, Self::Response, Self::Error>;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::Error>>;
|
||||
|
||||
fn new_service(&self, _cfg: Self::Config) -> Self::Future {
|
||||
ok(boxed::rc_service(fn_service(ok)))
|
||||
}
|
||||
}
|
||||
|
||||
let svc_fac = Ident;
|
||||
is_static(&svc_fac);
|
||||
impls_clone(&svc_fac);
|
||||
impls_send(&svc_fac);
|
||||
|
||||
let svc_fac = ServiceFactoryExt::map(Ident, core::convert::identity);
|
||||
impls_send(&svc_fac);
|
||||
let svc = svc_fac.new_service(()).await.unwrap();
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
let svc_fac = ServiceFactoryExt::map_err(Ident, core::convert::identity);
|
||||
impls_send(&svc_fac);
|
||||
let svc = svc_fac.new_service(()).await.unwrap();
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
let svc_fac = ServiceFactoryExt::map_init_err(Ident, core::convert::identity);
|
||||
impls_send(&svc_fac);
|
||||
let svc = svc_fac.new_service(()).await.unwrap();
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
let svc_fac = ServiceFactoryExt::and_then(Ident, Ident);
|
||||
// impls_send(&svc_fac); // fails to compile :(
|
||||
let svc = svc_fac.new_service(()).await.unwrap();
|
||||
svc.call(()).await.unwrap();
|
||||
|
||||
// let svc_fac = ServiceFactoryExt::and_then_send(Ident, Ident);
|
||||
// impls_send(&svc_fac);
|
||||
// let svc = svc_fac.new_service(()).await.unwrap();
|
||||
// svc.call(()).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
@@ -103,7 +103,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// `MapNewService` new service combinator
|
||||
/// `MapServiceFactory` new service combinator.
|
||||
pub struct MapServiceFactory<A, F, Req, Res> {
|
||||
a: A,
|
||||
f: F,
|
||||
|
@@ -238,8 +238,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Map this service's output to a different type, returning a new service
|
||||
/// of the resulting type.
|
||||
/// Map this service's output to a different type, returning a new service.
|
||||
pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<SF, F, Req, R>, Req>
|
||||
where
|
||||
Self: Sized,
|
||||
@@ -251,7 +250,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Map this service's error to a different error, returning a new service.
|
||||
/// Map this service's error to a different type, returning a new service.
|
||||
pub fn map_err<F, E>(
|
||||
self,
|
||||
f: F,
|
||||
@@ -266,7 +265,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Map this factory's init error to a different error, returning a new service.
|
||||
/// Map this factory's init error to a different type, returning a new service.
|
||||
pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<SF, F, Req, E>, Req>
|
||||
where
|
||||
Self: Sized,
|
||||
|
@@ -3,14 +3,29 @@
|
||||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 3.0.0-beta.7 - 2021-10-20
|
||||
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
|
||||
* Alias `connect::ssl` to `connect::tls`. [#401]
|
||||
|
||||
[#401]: https://github.com/actix/actix-net/pull/401
|
||||
|
||||
|
||||
## 3.0.0-beta.6 - 2021-10-19
|
||||
* Update `tokio-rustls` to `0.23` which uses `rustls` `0.20`. [#396]
|
||||
* Removed a re-export of `Session` from `rustls` as it no longer exist. [#396]
|
||||
* Minimum supported Rust version (MSRV) is now 1.52.
|
||||
|
||||
[#396]: https://github.com/actix/actix-net/pull/396
|
||||
|
||||
|
||||
## 3.0.0-beta.5 - 2021-03-29
|
||||
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
|
||||
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
|
||||
generation failed instead of panic. [#296]
|
||||
* Remove `connect::ssl::openssl::OpensslConnectServiceFactory`. [#297]
|
||||
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
|
||||
* Add `connect::ssl::native_tls` module for native tls support. [#295]
|
||||
* Rename `accept::{nativetls => native_tls}`. [#295]
|
||||
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
|
||||
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
|
||||
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
|
||||
|
||||
[#295]: https://github.com/actix/actix-net/pull/295
|
||||
|
@@ -1,12 +1,10 @@
|
||||
[package]
|
||||
name = "actix-tls"
|
||||
version = "3.0.0-beta.5"
|
||||
version = "3.0.0-beta.7"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "TLS acceptor and connector services for Actix ecosystem"
|
||||
keywords = ["network", "tls", "ssl", "async", "transport"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
documentation = "https://docs.rs/actix-tls"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
edition = "2018"
|
||||
@@ -40,7 +38,7 @@ native-tls = ["tokio-native-tls"]
|
||||
uri = ["http"]
|
||||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-codec = "0.4.0"
|
||||
actix-rt = { version = "2.2.0", default-features = false }
|
||||
actix-service = "2.0.0"
|
||||
actix-utils = "3.0.0"
|
||||
@@ -56,19 +54,20 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
|
||||
tokio-openssl = { version = "0.6", optional = true }
|
||||
|
||||
# rustls
|
||||
tokio-rustls = { version = "0.22", optional = true }
|
||||
webpki-roots = { version = "0.21", optional = true }
|
||||
tokio-rustls = { version = "0.23", optional = true }
|
||||
webpki-roots = { version = "0.22", optional = true }
|
||||
|
||||
# native-tls
|
||||
tokio-native-tls = { version = "0.3", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.2.0"
|
||||
actix-server = "2.0.0-beta.5"
|
||||
actix-server = "2.0.0-beta.6"
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
env_logger = "0.9"
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||
log = "0.4"
|
||||
rustls-pemfile = "0.2.1"
|
||||
trust-dns-resolver = "0.20.0"
|
||||
|
||||
[[example]]
|
||||
|
@@ -31,29 +31,36 @@ use std::{
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::ServiceFactoryExt as _;
|
||||
use actix_service::{fn_factory, fn_service, ServiceExt as _, ServiceFactory};
|
||||
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
||||
use futures_util::future::ok;
|
||||
use log::info;
|
||||
use rustls::{
|
||||
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
|
||||
};
|
||||
use rustls::{server::ServerConfig, Certificate, PrivateKey};
|
||||
use rustls_pemfile::{certs, rsa_private_keys};
|
||||
|
||||
const CERT_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/cert.pem"];
|
||||
const KEY_PATH: &str = concat![env!("CARGO_MANIFEST_DIR"), "/examples/key.pem"];
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
env::set_var("RUST_LOG", "info");
|
||||
env_logger::init();
|
||||
|
||||
let mut tls_config = ServerConfig::new(NoClientAuth::new());
|
||||
|
||||
// Load TLS key and cert files
|
||||
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap());
|
||||
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap());
|
||||
let cert_file = &mut BufReader::new(File::open(CERT_PATH).unwrap());
|
||||
let key_file = &mut BufReader::new(File::open(KEY_PATH).unwrap());
|
||||
|
||||
let cert_chain = certs(cert_file).unwrap();
|
||||
let cert_chain = certs(cert_file)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(Certificate)
|
||||
.collect();
|
||||
let mut keys = rsa_private_keys(key_file).unwrap();
|
||||
tls_config
|
||||
.set_single_cert(cert_chain, keys.remove(0))
|
||||
|
||||
let tls_config = ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
|
||||
.unwrap();
|
||||
|
||||
let tls_acceptor = RustlsAcceptor::new(tls_config);
|
||||
@@ -64,18 +71,34 @@ async fn main() -> io::Result<()> {
|
||||
info!("starting server on port: {}", &addr.0);
|
||||
|
||||
Server::build()
|
||||
.bind("tls-example", addr, move || {
|
||||
.bind("tls-example", addr, {
|
||||
let count = Arc::clone(&count);
|
||||
|
||||
// Set up TLS service factory
|
||||
tls_acceptor
|
||||
.clone()
|
||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||
.and_then(move |stream: TlsStream<TcpStream>| {
|
||||
let num = count.fetch_add(1, Ordering::Relaxed);
|
||||
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||
ok(())
|
||||
})
|
||||
// note: moving rustls acceptor into fn_factory scope
|
||||
fn_factory(move || {
|
||||
// manually call new_service so that and_then can be used from ServiceExt
|
||||
// type annotation for inner stream type is required
|
||||
let svc = <RustlsAcceptor as ServiceFactory<TcpStream>>::new_service(
|
||||
&tls_acceptor,
|
||||
(),
|
||||
);
|
||||
|
||||
let count = Arc::clone(&count);
|
||||
|
||||
async move {
|
||||
let svc = svc
|
||||
.await?
|
||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||
.and_then(fn_service(move |stream: TlsStream<TcpStream>| {
|
||||
let num = count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||
ok(())
|
||||
}));
|
||||
|
||||
Ok::<_, ()>(svc)
|
||||
}
|
||||
})
|
||||
})?
|
||||
.workers(1)
|
||||
.run()
|
||||
|
@@ -14,7 +14,7 @@ use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use tokio_rustls::{Accept, TlsAcceptor};
|
||||
|
||||
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
||||
pub use tokio_rustls::rustls::ServerConfig;
|
||||
|
||||
use super::MAX_CONN_COUNTER;
|
||||
|
||||
|
@@ -21,7 +21,9 @@ mod connector;
|
||||
mod error;
|
||||
mod resolve;
|
||||
mod service;
|
||||
pub mod ssl;
|
||||
pub mod tls;
|
||||
#[doc(hidden)]
|
||||
pub use tls as ssl;
|
||||
#[cfg(feature = "uri")]
|
||||
mod uri;
|
||||
|
||||
|
@@ -1,4 +1,4 @@
|
||||
//! SSL Services
|
||||
//! TLS Services
|
||||
|
||||
#[cfg(feature = "openssl")]
|
||||
pub mod openssl;
|
@@ -1,4 +1,5 @@
|
||||
use std::{
|
||||
convert::TryFrom,
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
@@ -6,7 +7,6 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub use tokio_rustls::rustls::Session;
|
||||
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
|
||||
pub use webpki_roots::TLS_SERVER_ROOTS;
|
||||
|
||||
@@ -14,11 +14,26 @@ use actix_rt::net::ActixStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use log::trace;
|
||||
use tokio_rustls::webpki::DNSNameRef;
|
||||
use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore};
|
||||
use tokio_rustls::{Connect, TlsConnector};
|
||||
|
||||
use crate::connect::{Address, Connection};
|
||||
|
||||
/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store.
|
||||
pub fn webpki_roots_cert_store() -> RootCertStore {
|
||||
let mut root_certs = RootCertStore::empty();
|
||||
for cert in TLS_SERVER_ROOTS.0 {
|
||||
let cert = OwnedTrustAnchor::from_subject_spki_name_constraints(
|
||||
cert.subject,
|
||||
cert.spki,
|
||||
cert.name_constraints,
|
||||
);
|
||||
let certs = vec![cert].into_iter();
|
||||
root_certs.add_server_trust_anchors(certs);
|
||||
}
|
||||
root_certs
|
||||
}
|
||||
|
||||
/// Rustls connector factory
|
||||
pub struct RustlsConnector {
|
||||
connector: Arc<ClientConfig>,
|
||||
@@ -89,7 +104,7 @@ where
|
||||
trace!("SSL Handshake start for: {:?}", connection.host());
|
||||
let (stream, connection) = connection.replace_io(());
|
||||
|
||||
match DNSNameRef::try_from_ascii_str(connection.host()) {
|
||||
match ServerName::try_from(connection.host()) {
|
||||
Ok(host) => RustlsConnectorServiceFuture::Future {
|
||||
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
|
||||
connection: Some(connection),
|
@@ -17,7 +17,7 @@ use actix_tls::connect::{self as actix_connect, Connect};
|
||||
#[cfg(feature = "openssl")]
|
||||
#[actix_rt::test]
|
||||
async fn test_string() {
|
||||
let srv = TestServer::with(|| {
|
||||
let srv = TestServer::with({
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
@@ -34,7 +34,7 @@ async fn test_string() {
|
||||
#[cfg(feature = "rustls")]
|
||||
#[actix_rt::test]
|
||||
async fn test_rustls_string() {
|
||||
let srv = TestServer::with(|| {
|
||||
let srv = TestServer::with({
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
@@ -50,13 +50,11 @@ async fn test_rustls_string() {
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_static_str() {
|
||||
let srv = TestServer::with(|| {
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
});
|
||||
let srv = TestServer::with(fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
Ok::<_, io::Error>(())
|
||||
}));
|
||||
|
||||
let conn = actix_connect::default_connector();
|
||||
|
||||
@@ -75,13 +73,11 @@ async fn test_static_str() {
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_new_service() {
|
||||
let srv = TestServer::with(|| {
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
});
|
||||
let srv = TestServer::with(fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
Ok::<_, io::Error>(())
|
||||
}));
|
||||
|
||||
let factory = actix_connect::default_connector_factory();
|
||||
|
||||
@@ -98,7 +94,7 @@ async fn test_new_service() {
|
||||
async fn test_openssl_uri() {
|
||||
use std::convert::TryFrom;
|
||||
|
||||
let srv = TestServer::with(|| {
|
||||
let srv = TestServer::with({
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
@@ -117,7 +113,7 @@ async fn test_openssl_uri() {
|
||||
async fn test_rustls_uri() {
|
||||
use std::convert::TryFrom;
|
||||
|
||||
let srv = TestServer::with(|| {
|
||||
let srv = TestServer::with({
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
@@ -133,7 +129,7 @@ async fn test_rustls_uri() {
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_local_addr() {
|
||||
let srv = TestServer::with(|| {
|
||||
let srv = TestServer::with({
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
|
@@ -38,8 +38,9 @@ async fn custom_resolver() {
|
||||
async fn custom_resolver_connect() {
|
||||
use trust_dns_resolver::TokioAsyncResolver;
|
||||
|
||||
let srv =
|
||||
TestServer::with(|| fn_service(|_io: TcpStream| async { Ok::<_, io::Error>(()) }));
|
||||
let srv = TestServer::with(fn_service(|_io: TcpStream| async {
|
||||
Ok::<_, io::Error>(())
|
||||
}));
|
||||
|
||||
struct MyResolver {
|
||||
trust_dns: TokioAsyncResolver,
|
||||
|
@@ -24,4 +24,5 @@ serde = { version = "1.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = "1.0"
|
||||
ahash = { version = "0.7", default-features = false }
|
||||
# TODO: remove when ahash MSRV is restored
|
||||
ahash = { version = "=0.7.4", default-features = false }
|
||||
|
1
clippy.toml
Normal file
1
clippy.toml
Normal file
@@ -0,0 +1 @@
|
||||
msrv = "1.48"
|
@@ -18,4 +18,4 @@ futures-util = { version = "0.3.7", default-features = false }
|
||||
local-waker = "0.1"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1", features = ["rt", "macros"] }
|
||||
tokio = { version = "1.5.1", features = ["rt", "macros"] }
|
||||
|
Reference in New Issue
Block a user