1
0
mirror of https://github.com/fafhrd91/actix-net synced 2025-08-13 22:28:22 +02:00

Compare commits

...

31 Commits

Author SHA1 Message Date
Rob Ede
06ddad0051 prepare rt and tls releases (#287) 2021-02-25 11:50:24 +00:00
Rob Ede
789e6a8a46 update ci (#284) 2021-02-24 09:48:41 +00:00
Rob Ede
6e590fd042 Merge pull request #285 from actix/dep/actix-server 2021-02-24 09:09:44 +00:00
fakeshadow
fa8ded3a34 bump tokio version for actix-server 2021-02-24 15:54:28 +08:00
Rob Ede
841c611233 doc nits 2021-02-24 01:39:02 +00:00
Rob Ede
81a2b6a425 add local_addr binding to connector service (#282) 2021-02-23 18:52:28 +00:00
fakeshadow
a6e79453d0 remove default reuse_addr 2021-02-24 02:26:11 +08:00
fakeshadow
17f711a9d6 update changelog 2021-02-24 01:20:01 +08:00
fakeshadow
c3be839a69 add local_addr binding to connector service 2021-02-24 01:13:17 +08:00
Rob Ede
8d74cf387d standardize openssl based stream name 2021-02-20 18:04:05 +00:00
Rob Ede
7e483cc356 tweak task and stream docs 2021-02-20 17:34:04 +00:00
fakeshadow
75d7ae3139 add actix stream trait (#276) 2021-02-20 17:25:22 +00:00
Juan Aguilar
2cfe1d88ad Refactor LocalWaker for use Cell and remove deprecated methods (#278) 2021-02-19 17:12:30 +00:00
Rob Ede
cb07ead392 prepare rt release 2.0.2 2021-02-06 22:52:53 +00:00
Rob Ede
32543809f9 add System::try_current (#275) 2021-02-06 22:45:03 +00:00
Rob Ede
eb4d29e15e add arbiter handle assoc fn (#274)
* add arbiter handle assoc fn
2021-02-06 22:27:56 +00:00
Rob Ede
7ee42b50b4 prepare router 0.2.7 release 2021-02-06 19:50:48 +00:00
Rob Ede
0da848e4ae fix server dev dep 2021-02-06 19:35:29 +00:00
Rob Ede
5f80d85010 fix server version 2021-02-06 19:34:58 +00:00
Rob Ede
16ba77c4c8 prepare next set of betas (#273) 2021-02-06 19:24:52 +00:00
Rob Ede
b4a3f51659 prepare rt release 2.0.1 2021-02-06 15:54:11 +00:00
Riley
9d0901e07f actix-rt: expose JoinError (#271) 2021-02-06 15:50:38 +00:00
fakeshadow
ebb9cd055f use static dispatch on signal handling. reduce allocation (#272) 2021-02-06 03:38:11 +00:00
Rob Ede
a77b70aed2 prepare service 2.0.0-beta.4 release (#269) 2021-02-04 20:44:13 +00:00
Rob Ede
c918da906b use reexported tls crates when possible 2021-02-04 15:23:06 +00:00
Rob Ede
b5399c5631 use reusable box future in tls connector 2021-02-04 15:23:06 +00:00
fakeshadow
7f0eddd794 add blocking thread customize (#265) 2021-02-04 15:01:51 +00:00
shuo
db3385e865 retry on EINTR in accept loop (#264)
Co-authored-by: lishuo <lishuo.03@bytedance.com>
2021-02-04 10:20:37 +00:00
Rob Ede
4a8693d000 readme grammar 2021-02-03 11:18:35 +00:00
Rob Ede
4ec358575e prepare actix-rt v2.0.0 release (#262) 2021-02-03 10:25:31 +00:00
Rob Ede
66bd5bf4a2 prepare macros v0.2.0 release (#261) 2021-02-02 02:07:58 +00:00
57 changed files with 1119 additions and 630 deletions

117
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,117 @@
name: CI
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches: [master]
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
target:
- { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu }
- { name: macOS, os: macos-latest, triple: x86_64-apple-darwin }
- { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc }
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
version:
- 1.46.0 # MSRV
- stable
- nightly
name: ${{ matrix.target.name }} / ${{ matrix.version }}
runs-on: ${{ matrix.target.os }}
steps:
- name: Setup Routing
if: matrix.target.os == 'macos-latest'
run: sudo ifconfig lo0 alias 127.0.0.3
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
profile: minimal
override: true
# - name: Install MSYS2
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
# uses: msys2/setup-msys2@v2
# - name: Install MinGW Packages
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
# run: |
# msys2 -c 'pacman -Sy --noconfirm pacman'
# msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
# - name: Generate Cargo.lock
# uses: actions-rs/cargo@v1
# with:
# command: generate-lockfile
# - name: Cache Dependencies
# uses: Swatinem/rust-cache@v1.2.0
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: check minimal
uses: actions-rs/cargo@v1
with:
command: hack
args: check --workspace --no-default-features
- name: check minimal + tests
uses: actions-rs/cargo@v1
with:
command: hack
args: check --workspace --no-default-features --tests --examples
- name: check default
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --tests --examples
- name: check full
# TODO: compile OpenSSL and run tests on MinGW
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --all-features --tests --examples
- name: tests
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --all-features --no-fail-fast -- --nocapture
- name: Generate coverage file
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
- name: Clear the cargo caches
run: |
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

View File

@@ -1,34 +1,42 @@
name: Lint
on:
pull_request:
types: [opened, synchronize, reopened]
name: Clippy and rustfmt Check
jobs:
clippy_check:
fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: rustfmt
profile: minimal
components: rustfmt
override: true
- name: Check with rustfmt
- name: Rustfmt Check
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- uses: actions-rs/toolchain@v1
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: clippy
toolchain: stable
profile: minimal
components: clippy
override: true
- name: Check with Clippy
- name: Clippy Check
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace --tests
args: --workspace --tests --all-features

View File

@@ -1,82 +0,0 @@
name: CI (Linux)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- 1.46.0
- stable
- nightly
name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with:
command: generate-lockfile
- name: Cache cargo dirs
uses: actions/cache@v2
with:
path:
~/.cargo/registry
~/.cargo/git
~/.cargo/bin
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: Cache cargo build
uses: actions/cache@v2
with:
path: target
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
- name: Generate coverage file
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --workspace
- name: Upload to Codecov
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
- name: Clear the cargo caches
run: |
rustup update stable
rustup override set stable
cargo install cargo-cache --no-default-features --features ci-autoclean
cargo-cache

View File

@@ -1,43 +0,0 @@
name: CI (macOS)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-apple-darwin
runs-on: macos-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
profile: minimal
override: true
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture

35
.github/workflows/upload-doc.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: Upload documentation
on:
push:
branches: [master]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Build Docs
uses: actions-rs/cargo@v1
with:
command: doc
args: --workspace --all-features --no-deps
- name: Tweak HTML
run: echo '<meta http-equiv="refresh" content="0;url=actix_server/index.html">' > target/doc/index.html
- name: Deploy to GitHub Pages
uses: JamesIves/github-pages-deploy-action@3.7.1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
BRANCH: gh-pages
FOLDER: target/doc

View File

@@ -1,45 +0,0 @@
name: CI (Windows-mingw)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
name: ${{ matrix.version }} - x86_64-pc-windows-gnu
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu
profile: minimal
override: true
- name: Install MSYS2
uses: msys2/setup-msys2@v2
- name: Install packages
run: |
msys2 -c 'pacman -Sy --noconfirm pacman'
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests

View File

@@ -1,69 +0,0 @@
name: CI (Windows)
on:
pull_request:
types: [opened, synchronize, reopened]
push:
branches:
- master
- '1.0'
env:
VCPKGRS_DYNAMIC: 1
jobs:
build_and_test:
strategy:
fail-fast: false
matrix:
version:
- stable
- nightly
target:
- x86_64-pc-windows-msvc
- i686-pc-windows-msvc
name: ${{ matrix.version }} - ${{ matrix.target }}
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Install ${{ matrix.version }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.version }}-${{ matrix.target }}
profile: minimal
override: true
- name: Install OpenSSL (x64)
if: matrix.target == 'x86_64-pc-windows-msvc'
run: |
vcpkg integrate install
vcpkg install openssl:x64-windows
Get-ChildItem C:\vcpkg\installed\x64-windows\bin
Get-ChildItem C:\vcpkg\installed\x64-windows\lib
Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
- name: Install OpenSSL (x86)
if: matrix.target == 'i686-pc-windows-msvc'
run: |
vcpkg integrate install
vcpkg install openssl:x86-windows
Get-ChildItem C:\vcpkg\installed\x86-windows\bin
Get-ChildItem C:\vcpkg\installed\x86-windows\lib
Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll
Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll
- name: check build
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --bins --examples --tests
- name: tests
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture

View File

@@ -6,7 +6,7 @@ description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-codec/"
documentation = "https://docs.rs/actix-codec"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@@ -3,8 +3,16 @@
## Unreleased - 2021-xx-xx
## 0.2.0 - 2021-02-02
* Update to latest `actix_rt::System::new` signature. [#261]
[#261]: https://github.com/actix/actix-net/pull/261
## 0.2.0-beta.1 - 2021-01-09
* Remove `actix-reexport` feature.
* Remove `actix-reexport` feature. [#218]
[#218]: https://github.com/actix/actix-net/pull/218
## 0.1.3 - 2020-12-03

View File

@@ -1,10 +1,10 @@
[package]
name = "actix-macros"
version = "0.2.0-beta.1"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime macros"
repository = "https://github.com/actix/actix-net"
documentation = "https://docs.rs/actix-macros/"
description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-macros"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -16,11 +16,8 @@ proc-macro = true
quote = "1.0.3"
syn = { version = "^1", features = ["full"] }
[features]
actix-reexport = []
[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "2.0.0"
futures-util = { version = "0.3", default-features = false }
trybuild = "1"

View File

@@ -1,4 +1,12 @@
//! Macros for use with Tokio
//! Macros for Actix system and runtime.
//!
//! The [`actix-rt`](https://docs.rs/actix-rt) crate must be available for macro output to compile.
//!
//! # Entry-point
//! See docs for the [`#[main]`](macro@main) macro.
//!
//! # Tests
//! See docs for the [`#[test]`](macro@test) macro.
#![deny(rust_2018_idioms, nonstandard_style)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
@@ -7,10 +15,9 @@
use proc_macro::TokenStream;
use quote::quote;
/// Marks async function to be executed by Actix system.
///
/// ## Usage
/// Marks async entry-point function to be executed by Actix system.
///
/// # Examples
/// ```
/// #[actix_rt::main]
/// async fn main() {
@@ -28,9 +35,12 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
let body = &input.block;
if sig.asyncness.is_none() {
return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
.to_compile_error()
.into();
return syn::Error::new_spanned(
sig.fn_token,
"the async keyword is missing from the function declaration",
)
.to_compile_error()
.into();
}
sig.asyncness = None;
@@ -45,11 +55,10 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
.into()
}
/// Marks async test function to be executed by Actix system.
/// Marks async test function to be executed in an Actix system.
///
/// ## Usage
///
/// ```no_run
/// # Examples
/// ```
/// #[actix_rt::test]
/// async fn my_test() {
/// assert!(true);
@@ -73,7 +82,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
if sig.asyncness.is_none() {
return syn::Error::new_spanned(
input.sig.fn_token,
format!("only async fn is supported, {}", input.sig.ident),
"the async keyword is missing from the function declaration",
)
.to_compile_error()
.into();
@@ -81,24 +90,19 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
sig.asyncness = None;
let result = if has_test_attr {
quote! {
#(#attrs)*
#vis #sig {
actix_rt::System::new()
.block_on(async { #body })
}
}
let missing_test_attr = if has_test_attr {
quote!()
} else {
quote! {
#[test]
#(#attrs)*
#vis #sig {
actix_rt::System::new()
.block_on(async { #body })
}
}
quote!(#[test])
};
result.into()
(quote! {
#missing_test_attr
#(#attrs)*
#vis #sig {
actix_rt::System::new()
.block_on(async { #body })
}
})
.into()
}

View File

@@ -3,7 +3,9 @@ fn compile_macros() {
let t = trybuild::TestCases::new();
t.pass("tests/trybuild/main-01-basic.rs");
t.compile_fail("tests/trybuild/main-02-only-async.rs");
t.pass("tests/trybuild/main-03-fn-params.rs");
t.pass("tests/trybuild/test-01-basic.rs");
t.pass("tests/trybuild/test-02-keep-attrs.rs");
t.compile_fail("tests/trybuild/test-03-only-async.rs");
}

View File

@@ -1,4 +1,4 @@
error: only async fn is supported
error: the async keyword is missing from the function declaration
--> $DIR/main-02-only-async.rs:2:1
|
2 | fn main() {

View File

@@ -0,0 +1,6 @@
#[actix_rt::main]
async fn main2(_param: bool) {
futures_util::future::ready(()).await
}
fn main() {}

View File

@@ -0,0 +1,6 @@
#[actix_rt::test]
fn my_test() {
futures_util::future::ready(()).await
}
fn main() {}

View File

@@ -0,0 +1,5 @@
error: the async keyword is missing from the function declaration
--> $DIR/test-03-only-async.rs:2:1
|
2 | fn my_test() {
| ^^

View File

@@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
## 0.2.7 - 2021-02-06
* Add `Router::recognize_checked` [#247]
[#247]: https://github.com/actix/actix-net/pull/247

View File

@@ -1,12 +1,12 @@
[package]
name = "actix-router"
version = "0.2.6"
version = "0.2.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Resource path matching library"
keywords = ["actix"]
keywords = ["actix", "router", "routing"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-router/"
documentation = "https://docs.rs/actix-router"
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@@ -28,6 +28,7 @@ enum PatternElement {
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
enum PatternType {
Static(String),
Prefix(String),
@@ -669,8 +670,6 @@ pub(crate) fn insert_slash(path: &str) -> String {
#[cfg(test)]
mod tests {
use super::*;
use http::Uri;
use std::convert::TryFrom;
#[test]
fn test_parse_static() {
@@ -832,8 +831,11 @@ mod tests {
assert!(re.is_match("/user/2345/sdg"));
}
#[cfg(feature = "http")]
#[test]
fn test_parse_urlencoded_param() {
use std::convert::TryFrom;
let re = ResourceDef::new("/user/{id}/test");
let mut path = Path::new("/user/2345/test");
@@ -844,7 +846,7 @@ mod tests {
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25");
let uri = Uri::try_from("/user/qwe%25/test").unwrap();
let uri = http::Uri::try_from("/user/qwe%25/test").unwrap();
let mut path = Path::new(uri);
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25");

View File

@@ -3,6 +3,35 @@
## Unreleased - 2021-xx-xx
## 2.1.0 - 2021-02-24
* Add `ActixStream` extension trait to include readiness methods. [#276]
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
[#276]: https://github.com/actix/actix-net/pull/276
[#282]: https://github.com/actix/actix-net/pull/282
## 2.0.2 - 2021-02-06
* Add `Arbiter::handle` to get a handle of an owned Arbiter. [#274]
* Add `System::try_current` for situations where actix may or may not be running a System. [#275]
[#274]: https://github.com/actix/actix-net/pull/274
[#275]: https://github.com/actix/actix-net/pull/275
## 2.0.1 - 2021-02-06
* Expose `JoinError` from Tokio. [#271]
[#271]: https://github.com/actix/actix-net/pull/271
## 2.0.0 - 2021-02-02
* Remove all Arbiter-local storage methods. [#262]
* Re-export `tokio::pin`. [#262]
[#262]: https://github.com/actix/actix-net/pull/262
## 2.0.0-beta.3 - 2021-01-31
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253]

View File

@@ -1,15 +1,15 @@
[package]
name = "actix-rt"
version = "2.0.0-beta.3"
version = "2.1.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Tokio-based single-thread async runtime for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt/"
repository = "https://github.com/actix/actix-net"
documentation = "https://docs.rs/actix-rt"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -23,10 +23,11 @@ default = ["macros"]
macros = ["actix-macros"]
[dependencies]
actix-macros = { version = "0.2.0-beta.1", optional = true }
actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tokio = { version = "1.2", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }

View File

@@ -1,7 +1,14 @@
# actix-rt
> Tokio-based single-thread async runtime for the Actix ecosystem.
> Tokio-based single-threaded async runtime for the Actix ecosystem.
See documentation for detailed explanations these components: [https://docs.rs/actix-rt][docs].
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.1.0)](https://docs.rs/actix-rt/2.1.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-rt/2.1.0/status.svg)](https://deps.rs/crate/actix-rt/2.1.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)
[docs]: https://docs.rs/actix-rt
See crate documentation for more: https://docs.rs/actix-rt.

View File

@@ -0,0 +1,28 @@
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use std::convert::Infallible;
use std::net::SocketAddr;
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::from("Hello World")))
}
fn main() {
actix_rt::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
})
.block_on(async {
let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
let server =
Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
})
}

View File

@@ -1,7 +1,5 @@
use std::{
any::{Any, TypeId},
cell::RefCell,
collections::HashMap,
fmt,
future::Future,
pin::Pin,
@@ -14,7 +12,7 @@ use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
runtime::Runtime,
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
@@ -22,7 +20,6 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!(
static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
);
pub(crate) enum ArbiterCommand {
@@ -97,16 +94,30 @@ pub struct Arbiter {
}
impl Arbiter {
/// Spawn new Arbiter thread and start its event loop.
/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
}
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
{
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
@@ -116,18 +127,17 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
STORAGE.with(|cell| cell.borrow_mut().clear());
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(id, hnd));
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
@@ -137,7 +147,7 @@ impl Arbiter {
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(id));
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
@@ -156,20 +166,24 @@ impl Arbiter {
let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(ArbiterRunner { rx });
hnd
}
/// Return a handle to the this Arbiter's message sender.
pub fn handle(&self) -> ArbiterHandle {
ArbiterHandle::new(self.tx.clone())
}
/// Return a handle to the current thread's Arbiter's message sender.
///
/// # Panics
/// Panics if no Arbiter is running on the current thread.
pub fn current() -> ArbiterHandle {
HANDLE.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(),
Some(ref hnd) => hnd.clone(),
None => panic!("Arbiter is not running."),
})
}
@@ -214,58 +228,6 @@ impl Arbiter {
pub fn join(self) -> thread::Result<()> {
self.thread_handle.join()
}
/// Insert item into Arbiter's thread-local storage.
///
/// Overwrites any item of the same type previously inserted.
#[deprecated = "Will be removed in stable v2."]
pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
}
/// Check if Arbiter's thread-local storage contains an item type.
#[deprecated = "Will be removed in stable v2."]
pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
}
/// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
let type_id = TypeId::of::<T>();
let item = st.get(&type_id).and_then(downcast_ref).unwrap();
f(item)
})
}
/// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
///
/// # Panics
/// Panics if item is not in Arbiter's thread-local item storage.
#[deprecated = "Will be removed in stable v2."]
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
let type_id = TypeId::of::<T>();
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();
f(item)
})
}
}
/// A persistent future that processes [Arbiter] commands.
@@ -296,11 +258,3 @@ impl Future for ArbiterRunner {
}
}
}
fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
boxed.downcast_ref()
}
fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
boxed.downcast_mut()
}

View File

@@ -1,4 +1,4 @@
//! Tokio-based single-thread async runtime for the Actix ecosystem.
//! Tokio-based single-threaded async runtime for the Actix ecosystem.
//!
//! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this
//! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not
@@ -12,7 +12,7 @@
//!
//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
//! blocking thread-pool using [`task::spawn_blocking`].
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
@@ -56,6 +56,8 @@ pub use self::arbiter::{Arbiter, ArbiterHandle};
pub use self::runtime::Runtime;
pub use self::system::{System, SystemRunner};
pub use tokio::pin;
pub mod signal {
//! Asynchronous signal handling (Tokio re-exports).
@@ -68,13 +70,50 @@ pub mod signal {
}
pub mod net {
//! TCP/UDP/Unix bindings (Tokio re-exports).
//! TCP/UDP/Unix bindings (mostly Tokio re-exports).
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
pub use tokio::net::UdpSocket;
pub use tokio::net::{TcpListener, TcpStream};
pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
#[cfg(unix)]
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
/// Extension trait over async read+write types that can also signal readiness.
pub trait ActixStream: AsyncRead + AsyncWrite + Unpin + 'static {
/// Poll stream and check read readiness of Self.
///
/// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
/// Poll stream and check write readiness of Self.
///
/// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
}
impl ActixStream for TcpStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
TcpStream::poll_read_ready(self, cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
TcpStream::poll_write_ready(self, cx)
}
}
#[cfg(unix)]
impl ActixStream for UnixStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
UnixStream::poll_read_ready(self, cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
UnixStream::poll_write_ready(self, cx)
}
}
}
pub mod time {
@@ -89,7 +128,7 @@ pub mod time {
pub mod task {
//! Task management (Tokio re-exports).
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
pub use tokio::task::{spawn_blocking, yield_now, JoinError, JoinHandle};
}
/// Spawns a future on the current thread.

View File

@@ -2,7 +2,7 @@ use std::{future::Future, io};
use tokio::task::{JoinHandle, LocalSet};
/// A single-threaded runtime based on Tokio's "current thread" runtime.
/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
@@ -12,14 +12,18 @@ pub struct Runtime {
rt: tokio::runtime::Runtime,
}
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
}
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> io::Result<Runtime> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()?;
pub fn new() -> io::Result<Self> {
let rt = default_tokio_runtime()?;
Ok(Runtime {
rt,
@@ -81,3 +85,12 @@ impl Runtime {
self.local.block_on(&self.rt, f)
}
}
impl From<tokio::runtime::Runtime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt,
}
}
}

View File

@@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -36,10 +36,24 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());
@@ -86,6 +100,15 @@ impl System {
})
}
/// Try to get current running system.
///
/// Returns `None` if no System has been started.
///
/// Contrary to `current`, this never panics.
pub fn try_current() -> Option<System> {
CURRENT.with(|cell| cell.borrow().clone())
}
/// Get handle to a the System's initial [Arbiter].
pub fn arbiter(&self) -> &ArbiterHandle {
&self.arbiter_handle

View File

@@ -1,5 +1,9 @@
use std::{
sync::mpsc::channel,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread,
time::{Duration, Instant},
};
@@ -118,6 +122,28 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
let (tx, rx) = channel::<u32>();
let arbiter = Arbiter::new();
let handle = arbiter.handle();
drop(arbiter);
handle.spawn_fn(move || {
tx.send(42).unwrap();
System::current().stop()
});
let num = rx.recv_timeout(Duration::from_secs(2)).unwrap();
assert_eq!(num, 42);
handle.stop();
sys.run().unwrap();
}
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
@@ -140,36 +166,6 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[test]
#[allow(deprecated)]
fn arbiter_item_storage() {
let _ = System::new();
let arbiter = Arbiter::new();
assert!(!Arbiter::contains_item::<u32>());
Arbiter::set_item(42u32);
assert!(Arbiter::contains_item::<u32>());
Arbiter::get_item(|&item: &u32| assert_eq!(item, 42));
Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
let thread = thread::spawn(move || {
Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
})
.join();
assert!(thread.is_err());
let thread = thread::spawn(move || {
Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
})
.join();
assert!(thread.is_err());
arbiter.stop();
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
@@ -224,9 +220,81 @@ fn system_stop_stops_arbiters() {
System::current().stop();
sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows)
thread::sleep(Duration::from_millis(100));
// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
assert!(!arb.spawn_fn(|| {}));
arb.join().unwrap();
}
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
let res = System::with_tokio_rt(move || {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.thread_keep_alive(Duration::from_millis(1000))
.worker_threads(2)
.max_blocking_threads(2)
.on_thread_start(|| {})
.on_thread_stop(|| {})
.build()
.unwrap()
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(42).unwrap();
})
.await
.unwrap();
123usize
});
assert_eq!(res, 123);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn new_arbiter_with_tokio() {
let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
});
let counter = Arc::new(AtomicBool::new(true));
let counter1 = counter.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
counter1.store(false, Ordering::SeqCst);
Arbiter::current().stop();
});
assert!(did_spawn);
arb.join().unwrap();
assert_eq!(false, counter.load(Ordering::SeqCst));
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
}
#[test]
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}

View File

@@ -1,9 +1,18 @@
# Changes
## Unreleased - 2021-xx-xx
## 2.0.0-beta.3 - 2021-02-06
* Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246]
* Add retry for EINTR signal (`io::Interrupted`) in `Accept`'s poll loop. [#264]
* Add `ServerBuilder::worker_max_blocking_threads` to customize blocking thread pool size. [#265]
* Update `actix-rt` to `2.0.0`. [#273]
[#246]: https://github.com/actix/actix-net/pull/246
[#264]: https://github.com/actix/actix-net/pull/264
[#265]: https://github.com/actix/actix-net/pull/265
[#273]: https://github.com/actix/actix-net/pull/273
## 2.0.0-beta.2 - 2021-01-03

View File

@@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "2.0.0-beta.2"
version = "2.0.0-beta.3"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
@@ -9,10 +9,9 @@ description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server/"
documentation = "https://docs.rs/actix-server"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
exclude = [".gitignore", ".cargo/config"]
edition = "2018"
[lib]
@@ -24,19 +23,19 @@ default = []
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.2", default-features = false }
actix-service = "2.0.0-beta.3"
actix-utils = "3.0.0-beta.1"
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.4"
actix-utils = "3.0.0-beta.2"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"
slab = "0.4"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@@ -161,9 +161,16 @@ impl Accept {
let mut events = mio::Events::with_capacity(128);
loop {
self.poll
.poll(&mut events, None)
.unwrap_or_else(|e| panic!("Poll error: {}", e));
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
std::io::ErrorKind::Interrupted => {
continue;
}
_ => {
panic!("Poll error: {}", e);
}
}
}
for event in events.iter() {
let token = event.token();

View File

@@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle};
use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
use crate::{join_all, Token};
/// Server builder
@@ -32,11 +32,11 @@ pub struct ServerBuilder {
sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop,
exit: bool,
shutdown_timeout: Duration,
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
server: Server,
notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig,
}
impl Default for ServerBuilder {
@@ -60,11 +60,11 @@ impl ServerBuilder {
accept: AcceptLoop::new(server.clone()),
backlog: 2048,
exit: false,
shutdown_timeout: Duration::from_secs(30),
no_signals: false,
cmd: rx,
notify: Vec::new(),
server,
worker_config: ServerWorkerConfig::default(),
}
}
@@ -78,6 +78,24 @@ impl ServerBuilder {
self
}
/// Set max number of threads for each worker's blocking task thread pool.
///
/// One thread pool is set up **per worker**; not shared across workers.
///
/// # Examples:
/// ```
/// # use actix_server::ServerBuilder;
/// let builder = ServerBuilder::new()
/// .workers(4) // server has 4 worker thread.
/// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
/// ```
///
/// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
self.worker_config.max_blocking_threads(num);
self
}
/// Set the maximum number of pending connections.
///
/// This refers to the number of clients that can be waiting to be served.
@@ -124,7 +142,8 @@ impl ServerBuilder {
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u64) -> Self {
self.shutdown_timeout = Duration::from_secs(sec);
self.worker_config
.shutdown_timeout(Duration::from_secs(sec));
self
}
@@ -297,7 +316,7 @@ impl ServerBuilder {
let avail = WorkerAvailability::new(waker);
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, avail, self.shutdown_timeout)
ServerWorker::start(idx, services, avail, self.worker_config)
}
fn handle_cmd(&mut self, item: ServerCommand) {

View File

@@ -2,8 +2,6 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::future::LocalBoxFuture;
use crate::server::Server;
/// Different types of process signals
@@ -23,9 +21,9 @@ pub(crate) enum Signal {
pub(crate) struct Signals {
srv: Server,
#[cfg(not(unix))]
signals: LocalBoxFuture<'static, std::io::Result<()>>,
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)]
signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>,
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
}
impl Signals {
@@ -48,23 +46,21 @@ impl Signals {
(unix::SignalKind::quit(), Signal::Quit),
];
let mut signals = Vec::new();
for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) {
Ok(mut stream) => {
let fut = Box::pin(async move {
let _ = stream.recv().await;
}) as _;
signals.push((*sig, fut));
}
Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}",
sig,
e
),
}
}
let signals = sig_map
.iter()
.filter_map(|(kind, sig)| {
unix::signal(*kind)
.map(|tokio_sig| (*sig, tokio_sig))
.map_err(|e| {
log::error!(
"Can not initialize stream handler for {:?} err: {}",
sig,
e
)
})
.ok()
})
.collect::<Vec<_>>();
actix_rt::spawn(Signals { srv, signals });
}
@@ -86,7 +82,7 @@ impl Future for Signals {
#[cfg(unix)]
{
for (sig, fut) in self.signals.iter_mut() {
if fut.as_mut().poll(cx).is_ready() {
if Pin::new(fut).poll_recv(cx).is_ready() {
let sig = *sig;
self.srv.signal(sig);
return Poll::Ready(());

View File

@@ -8,7 +8,7 @@ use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle;
/// waker token for `mio::Poll` instance
/// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
@@ -30,7 +30,7 @@ impl Deref for WakerQueue {
}
impl WakerQueue {
/// construct a waker queue with given `Poll`'s `Registry` and capacity.
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event's token for it to properly handle `WakerInterest`.
@@ -41,7 +41,7 @@ impl WakerQueue {
Ok(Self(Arc::new((waker, queue))))
}
/// push a new interest to the queue and wake up the accept poll afterwards.
/// Push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) {
let (waker, queue) = self.deref();
@@ -55,20 +55,20 @@ impl WakerQueue {
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
}
/// get a MutexGuard of the waker queue.
/// Get a MutexGuard of the waker queue.
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
self.deref().1.lock().expect("Failed to lock WakerQueue")
}
/// reset the waker queue so it does not grow infinitely.
/// Reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
}
}
/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks.

View File

@@ -133,7 +133,7 @@ pub(crate) struct ServerWorker {
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
shutdown_timeout: Duration,
config: ServerWorkerConfig,
}
struct WorkerService {
@@ -159,26 +159,62 @@ enum WorkerServiceStatus {
Stopped,
}
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,
}
impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
max_blocking_threads,
}
}
}
impl ServerWorkerConfig {
pub(crate) fn max_blocking_threads(&mut self, num: usize) {
self.max_blocking_threads = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur;
}
}
impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
shutdown_timeout: Duration,
config: ServerWorkerConfig,
) -> WorkerHandle {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = availability.clone();
// every worker runs in it's own arbiter.
Arbiter::new().spawn(Box::pin(async move {
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx,
rx2,
availability,
factories,
shutdown_timeout,
config,
services: Vec::new(),
conns: conns.clone(),
state: WorkerState::Unavailable,
@@ -198,6 +234,8 @@ impl ServerWorker {
})
.collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future.
// As Arbiter::spawn would box the future before send it to arbiter.
spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
match res {
@@ -220,7 +258,7 @@ impl ServerWorker {
}
wrk.await
});
}));
});
WorkerHandle::new(idx, tx1, tx2, avail)
}
@@ -324,7 +362,7 @@ impl Future for ServerWorker {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))),
Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)),
Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)),
Some(result),
);
} else {

View File

@@ -1,10 +1,13 @@
# Changes
## Unreleased - 2021-xx-xx
* `Service::poll_ready` and `Service::call` take `&self`. [#247]
* `apply_fn` and `apply_fn_factory` would take `Fn(Req, &Service)` function type [#247]
* `apply_cfg` and `apply_cfg_factory` would take `Fn(Req, &Service)` function type [#247]
* `fn_service` module would take `Fn(Req)` function type. [#247]
## 2.0.0-beta.4 - 2021-02-04
* `Service::poll_ready` and `Service::call` receive `&self`. [#247]
* `apply_fn` and `apply_fn_factory` now receive `Fn(Req, &Service)` function type. [#247]
* `apply_cfg` and `apply_cfg_factory` now receive `Fn(Req, &Service)` function type. [#247]
* `fn_service` and friends now receive `Fn(Req)` function type. [#247]
[#247]: https://github.com/actix/actix-net/pull/247

View File

@@ -1,9 +1,10 @@
[package]
name = "actix-service"
version = "2.0.0-beta.3"
version = "2.0.0-beta.4"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
"fakeshadow <24548779@qq.com>",
]
description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"]
@@ -24,5 +25,5 @@ futures-core = { version = "0.3.7", default-features = false }
pin-project-lite = "0.2"
[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }

View File

@@ -2,6 +2,12 @@
> Service trait and combinators for representing asynchronous request/response operations.
See documentation for detailed explanations these components: [https://docs.rs/actix-service][docs].
[![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service)
[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.4)](https://docs.rs/actix-service/2.0.0-beta.4)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![License](https://img.shields.io/crates/l/actix-service.svg)
[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.4/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.4)
[![Download](https://img.shields.io/crates/d/actix-service.svg)](https://crates.io/crates/actix-service)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)
[docs]: https://docs.rs/actix-service
See documentation for detailed explanations of these components: https://docs.rs/actix-service.

View File

@@ -102,8 +102,8 @@ pub trait Service<Req> {
/// call and the next invocation of `call` results in an error.
///
/// # Notes
/// 1. `.poll_ready()` might be called on different task from actual service call.
/// 1. In case of chained services, `.poll_ready()` get called for all services at once.
/// 1. `poll_ready` might be called on a different task to `call`.
/// 1. In cases of chained services, `.poll_ready()` is called for all services at once.
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.

View File

@@ -1,17 +1,30 @@
# Changes
## Unreleased - 2021-xx-xx
## 3.0.0-beta.4 - 2021-02-24
* Rename `accept::openssl::{SslStream => TlsStream}`.
* Add `connect::Connect::set_local_addr` to attach local `IpAddr`. [#282]
* `connector::TcpConnector` service will try to bind to local_addr of `IpAddr` when given. [#282]
[#282]: https://github.com/actix/actix-net/pull/282
## 3.0.0-beta.3 - 2021-02-06
* Remove `trust-dns-proto` and `trust-dns-resolver`. [#248]
* Use `std::net::ToSocketAddrs` as simple and basic default resolver. [#248]
* Add `Resolve` trait for custom dns resolver. [#248]
* Add `Resolve` trait for custom DNS resolvers. [#248]
* Add `Resolver::new_custom` function to construct custom resolvers. [#248]
* Export `webpki_roots::TLS_SERVER_ROOTS` in `actix_tls::connect` mod and remove
the export from `actix_tls::accept` [#248]
* Remove `ConnectTakeAddrsIter`. `Connect::take_addrs` now returns `ConnectAddrsIter<'static>`
as owned iterator. [#248]
* Rename `Address::{host => hostname}` to more accurately describe which URL segment is returned.
* Update `actix-rt` to `2.0.0`. [#273]
[#248]: https://github.com/actix/actix-net/pull/248
[#273]: https://github.com/actix/actix-net/pull/273
## 3.0.0-beta.2 - 2021-xx-xx

View File

@@ -1,12 +1,12 @@
[package]
name = "actix-tls"
version = "3.0.0-beta.2"
version = "3.0.0-beta.4"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls/"
documentation = "https://docs.rs/actix-tls"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -31,44 +31,46 @@ connect = []
openssl = ["tls-openssl", "tokio-openssl"]
# use rustls impls
rustls = ["tls-rustls", "webpki", "webpki-roots", "tokio-rustls"]
rustls = ["tokio-rustls", "webpki-roots"]
# use native-tls impls
native-tls = ["tls-native-tls", "tokio-native-tls"]
native-tls = ["tokio-native-tls"]
# support http::Uri as connect address
uri = ["http"]
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.2", default-features = false }
actix-service = "2.0.0-beta.3"
actix-utils = "3.0.0-beta.1"
actix-rt = { version = "2.1.0", default-features = false }
actix-service = "2.0.0-beta.4"
actix-utils = "3.0.0-beta.2"
derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
http = { version = "0.2.3", optional = true }
log = "0.4"
tokio-util = { version = "0.6.3", default-features = false }
# openssl
tls-openssl = { package = "openssl", version = "0.10", optional = true }
tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tokio-openssl = { version = "0.6", optional = true }
# TODO: Reduce dependencies where tokio wrappers re-export base crate.
# rustls
tls-rustls = { package = "rustls", version = "0.19", optional = true }
tokio-rustls = { version = "0.22", optional = true }
webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.21", optional = true }
# native-tls
tls-native-tls = { package = "native-tls", version = "0.2", optional = true }
tokio-native-tls = { version = "0.3", optional = true }
[target.'cfg(windows)'.dependencies.tls-openssl]
version = "0.10.9"
package = "openssl"
features = ["vendored"]
optional = true
[dev-dependencies]
actix-rt = "2.0.0-beta.2"
actix-server = "2.0.0-beta.2"
actix-rt = "2.1.0"
actix-server = "2.0.0-beta.3"
bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
@@ -76,5 +78,5 @@ log = "0.4"
trust-dns-resolver = "0.20.0"
[[example]]
name = "basic"
name = "tcp-rustls"
required-features = ["accept", "rustls"]

View File

@@ -15,9 +15,9 @@
//! http --verify=false https://127.0.0.1:8443
//! ```
// this rename only exists because of how we have organised the crate's feature flags
// this use only exists because of how we have organised the crate
// it is not necessary for your actual code
extern crate tls_rustls as rustls;
use tokio_rustls::rustls;
use std::{
env,
@@ -29,9 +29,10 @@ use std::{
},
};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::pipeline_factory;
use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
use log::info;
use rustls::{
@@ -74,9 +75,9 @@ async fn main() -> io::Result<()> {
// Set up TLS service factory
pipeline_factory(tls_acceptor.clone())
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream| {
.and_then(move |stream: TlsStream<TcpStream>| {
let num = count.fetch_add(1, Ordering::Relaxed);
info!("[{}] Got TLS connection: {:?}", num, stream);
info!("[{}] Got TLS connection: {:?}", num, &*stream);
ok(())
})
})?

View File

@@ -1,15 +1,94 @@
use std::task::{Context, Poll};
use std::{
io::{self, IoSlice},
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture;
pub use native_tls::Error;
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
pub use tokio_native_tls::native_tls::Error;
pub use tokio_native_tls::TlsAcceptor;
use super::MAX_CONN_COUNTER;
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
impl<T> From<tokio_native_tls::TlsStream<T>> for TlsStream<T> {
fn from(stream: tokio_native_tls::TlsStream<T>) -> Self {
Self(stream)
}
}
impl<T: ActixStream> Deref for TlsStream<T> {
type Target = tokio_native_tls::TlsStream<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: ActixStream> DerefMut for TlsStream<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
(&**self).is_write_vectored()
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
}
}
/// Accept TLS connections via `native-tls` package.
///
/// `native-tls` feature enables this `Acceptor` type.
@@ -34,10 +113,7 @@ impl Clone for Acceptor {
}
}
impl<T> ServiceFactory<T> for Acceptor
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
type Error = Error;
type Config = ();
@@ -71,10 +147,7 @@ impl Clone for NativeTlsAcceptorService {
}
}
impl<T> Service<T> for NativeTlsAcceptorService
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
impl<T: ActixStream> Service<T> for NativeTlsAcceptorService {
type Response = TlsStream<T>;
type Error = Error;
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
@@ -93,7 +166,7 @@ where
Box::pin(async move {
let io = this.acceptor.accept(io).await;
drop(guard);
io
io.map(Into::into)
})
}
}

View File

@@ -1,10 +1,13 @@
use std::{
future::Future,
io::{self, IoSlice},
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_core::{future::LocalBoxFuture, ready};
@@ -12,10 +15,82 @@ use futures_core::{future::LocalBoxFuture, ready};
pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
pub use tokio_openssl::SslStream;
use super::MAX_CONN_COUNTER;
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
impl<T> From<tokio_openssl::SslStream<T>> for TlsStream<T> {
fn from(stream: tokio_openssl::SslStream<T>) -> Self {
Self(stream)
}
}
impl<T> Deref for TlsStream<T> {
type Target = tokio_openssl::SslStream<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TlsStream<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
(&**self).is_write_vectored()
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_read_ready((&**self).get_ref(), cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_write_ready((&**self).get_ref(), cx)
}
}
/// Accept TLS connections via `openssl` package.
///
/// `openssl` feature enables this `Acceptor` type.
@@ -40,11 +115,8 @@ impl Clone for Acceptor {
}
}
impl<T> ServiceFactory<T> for Acceptor
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
type Error = SslError;
type Config = ();
type Service = AcceptorService;
@@ -67,11 +139,8 @@ pub struct AcceptorService {
conns: Counter,
}
impl<T> Service<T> for AcceptorService
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>;
type Error = SslError;
type Future = AcceptorServiceResponse<T>;
@@ -88,24 +157,25 @@ where
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
_guard: self.conns.get(),
stream: Some(SslStream::new(ssl, io).unwrap()),
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
}
}
}
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead + AsyncWrite,
{
stream: Option<SslStream<T>>,
pub struct AcceptorServiceResponse<T: ActixStream> {
stream: Option<tokio_openssl::SslStream<T>>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, SslError>;
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
type Output = Result<TlsStream<T>, SslError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
Poll::Ready(Ok(self
.stream
.take()
.expect("SSL connect has resolved.")
.into()))
}
}

View File

@@ -1,22 +1,96 @@
use std::{
future::Future,
io,
io::{self, IoSlice},
ops::{Deref, DerefMut},
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture;
use tokio_rustls::{Accept, TlsAcceptor};
pub use rustls::{ServerConfig, Session};
pub use tokio_rustls::server::TlsStream;
pub use tokio_rustls::rustls::{ServerConfig, Session};
use super::MAX_CONN_COUNTER;
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
impl<T> From<tokio_rustls::server::TlsStream<T>> for TlsStream<T> {
fn from(stream: tokio_rustls::server::TlsStream<T>) -> Self {
Self(stream)
}
}
impl<T> Deref for TlsStream<T> {
type Target = tokio_rustls::server::TlsStream<T>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TlsStream<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
(&**self).is_write_vectored()
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_read_ready((&**self).get_ref().0, cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
T::poll_write_ready((&**self).get_ref().0, cx)
}
}
/// Accept TLS connections via `rustls` package.
///
/// `rustls` feature enables this `Acceptor` type.
@@ -43,10 +117,7 @@ impl Clone for Acceptor {
}
}
impl<T> ServiceFactory<T> for Acceptor
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
type Error = io::Error;
type Config = ();
@@ -72,10 +143,7 @@ pub struct AcceptorService {
conns: Counter,
}
impl<T> Service<T> for AcceptorService
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>;
type Error = io::Error;
type Future = AcceptorServiceFut<T>;
@@ -96,22 +164,16 @@ where
}
}
pub struct AcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
pub struct AcceptorServiceFut<T: ActixStream> {
fut: Accept<T>,
_guard: CounterGuard,
}
impl<T> Future for AcceptorServiceFut<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
type Output = Result<TlsStream<T>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
Pin::new(&mut this.fut).poll(cx)
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream)
}
}

View File

@@ -3,7 +3,7 @@ use std::{
fmt,
iter::{self, FromIterator as _},
mem,
net::SocketAddr,
net::{IpAddr, SocketAddr},
};
/// Parse a host into parts (hostname and port).
@@ -67,6 +67,7 @@ pub struct Connect<T> {
pub(crate) req: T,
pub(crate) port: u16,
pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>,
}
impl<T: Address> Connect<T> {
@@ -78,6 +79,7 @@ impl<T: Address> Connect<T> {
req,
port: port.unwrap_or(0),
addr: ConnectAddrs::None,
local_addr: None,
}
}
@@ -88,6 +90,7 @@ impl<T: Address> Connect<T> {
req,
port: 0,
addr: ConnectAddrs::One(addr),
local_addr: None,
}
}
@@ -119,6 +122,12 @@ impl<T: Address> Connect<T> {
self
}
/// Set local_addr of connect.
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
self.local_addr = Some(addr.into());
self
}
/// Get hostname.
pub fn hostname(&self) -> &str {
self.req.hostname()
@@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option<u16>) {
#[cfg(test)]
mod tests {
use std::net::{IpAddr, Ipv4Addr};
use std::net::Ipv4Addr;
use super::*;
@@ -329,4 +338,13 @@ mod tests {
let mut iter = ConnectAddrsIter::None;
assert_eq!(iter.next(), None);
}
#[test]
fn test_local_addr() {
let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]);
assert_eq!(
conn.local_addr.unwrap(),
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
)
}
}

View File

@@ -2,15 +2,16 @@ use std::{
collections::VecDeque,
future::Future,
io,
net::SocketAddr,
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
pin::Pin,
task::{Context, Poll},
};
use actix_rt::net::TcpStream;
use actix_rt::net::{TcpSocket, TcpStream};
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, trace};
use tokio_util::sync::ReusableBoxFuture;
use super::connect::{Address, Connect, ConnectAddrs, Connection};
use super::error::ConnectError;
@@ -53,9 +54,14 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
fn call(&self, req: Connect<T>) -> Self::Future {
let port = req.port();
let Connect { req, addr, .. } = req;
let Connect {
req,
addr,
local_addr,
..
} = req;
TcpConnectorResponse::new(req, port, addr)
TcpConnectorResponse::new(req, port, local_addr, addr)
}
}
@@ -64,14 +70,20 @@ pub enum TcpConnectorResponse<T> {
Response {
req: Option<T>,
port: u16,
local_addr: Option<IpAddr>,
addrs: Option<VecDeque<SocketAddr>>,
stream: Option<LocalBoxFuture<'static, Result<TcpStream, io::Error>>>,
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
},
Error(Option<ConnectError>),
}
impl<T: Address> TcpConnectorResponse<T> {
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> {
pub(crate) fn new(
req: T,
port: u16,
local_addr: Option<IpAddr>,
addr: ConnectAddrs,
) -> TcpConnectorResponse<T> {
if addr.is_none() {
error!("TCP connector: unresolved connection address");
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
@@ -89,8 +101,9 @@ impl<T: Address> TcpConnectorResponse<T> {
ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
req: Some(req),
port,
local_addr,
addrs: None,
stream: Some(Box::pin(TcpStream::connect(addr))),
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
},
// when resolver returns multiple socket addr for request they would be popped from
@@ -98,6 +111,7 @@ impl<T: Address> TcpConnectorResponse<T> {
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
req: Some(req),
port,
local_addr,
addrs: Some(addrs),
stream: None,
},
@@ -115,11 +129,12 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
TcpConnectorResponse::Response {
req,
port,
local_addr,
addrs,
stream,
} => loop {
if let Some(new) = stream.as_mut() {
match ready!(new.as_mut().poll(cx)) {
match ready!(new.poll(cx)) {
Ok(sock) => {
let req = req.take().unwrap();
trace!(
@@ -146,8 +161,39 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
// try to connect
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
*stream = Some(Box::pin(TcpStream::connect(addr)));
let fut = connect(addr, *local_addr);
match stream {
Some(rbf) => rbf.set(fut),
None => *stream = Some(ReusableBoxFuture::new(fut)),
}
},
}
}
}
async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
// use local addr if connect asks for it.
match local_addr {
Some(ip_addr) => {
let socket = match ip_addr {
IpAddr::V4(ip_addr) => {
let socket = TcpSocket::new_v4()?;
let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
socket.bind(addr)?;
socket
}
IpAddr::V6(ip_addr) => {
let socket = TcpSocket::new_v6()?;
let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
socket.bind(addr)?;
socket
}
};
socket.connect(addr).await
}
None => TcpStream::connect(addr).await,
}
}

View File

@@ -6,7 +6,7 @@ use std::{
task::{Context, Poll},
};
pub use rustls::Session;
pub use tokio_rustls::rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
pub use webpki_roots::TLS_SERVER_ROOTS;
@@ -14,8 +14,8 @@ use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use tokio_rustls::webpki::DNSNameRef;
use tokio_rustls::{Connect, TlsConnector};
use webpki::DNSNameRef;
use crate::connect::{Address, Connection};

View File

@@ -4,12 +4,8 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(feature = "native-tls")]
extern crate tls_native_tls as native_tls;
#[cfg(feature = "openssl")]
extern crate tls_openssl as openssl;
#[cfg(feature = "rustls")]
extern crate tls_rustls as rustls;
#[cfg(feature = "accept")]
pub mod accept;

View File

@@ -1,4 +1,9 @@
use std::io;
#![cfg(feature = "connect")]
use std::{
io,
net::{IpAddr, Ipv4Addr},
};
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
@@ -9,7 +14,7 @@ use futures_util::sink::SinkExt;
use actix_tls::connect::{self as actix_connect, Connect};
#[cfg(all(feature = "connect", feature = "openssl"))]
#[cfg(feature = "openssl")]
#[actix_rt::test]
async fn test_string() {
let srv = TestServer::with(|| {
@@ -125,3 +130,25 @@ async fn test_rustls_uri() {
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
#[actix_rt::test]
async fn test_local_addr() {
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
})
});
let conn = actix_connect::default_connector();
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
let (con, _) = conn
.call(Connect::with_addr("10", srv.addr()).set_local_addr(local))
.await
.unwrap()
.into_parts();
assert_eq!(con.local_addr().unwrap().ip(), local)
}

View File

@@ -1,3 +1,5 @@
#![cfg(feature = "connect")]
use std::{
io,
net::{Ipv4Addr, SocketAddr},

View File

@@ -6,7 +6,7 @@ description = "Support for tokio tracing with Actix services"
keywords = ["network", "framework", "tracing"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tracing/"
documentation = "https://docs.rs/actix-tracing"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -16,12 +16,12 @@ name = "actix_tracing"
path = "src/lib.rs"
[dependencies]
actix-service = "2.0.0-beta.3"
actix-service = "2.0.0-beta.4"
futures-util = { version = "0.3.4", default-features = false }
tracing = "0.1"
tracing-futures = "0.2"
[dev_dependencies]
actix-rt = "2.0.0-beta.2"
actix-rt = "2.0.0"
slab = "0.4"

View File

@@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx
## 3.0.0-beta.2 - 2021-02-06
* Update `actix-rt` to `2.0.0`. [#273]
[#273]: https://github.com/actix/actix-net/pull/273
## 3.0.0-beta.1 - 2020-12-28
* Update `bytes` dependency to `1`. [#237]
* Use `pin-project-lite` to replace `pin-project`. [#229]

View File

@@ -1,12 +1,12 @@
[package]
name = "actix-utils"
version = "3.0.0-beta.1"
version = "3.0.0-beta.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Various network related services and utilities for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-utils/"
documentation = "https://docs.rs/actix-utils"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -17,8 +17,8 @@ path = "src/lib.rs"
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0-beta.2", default-features = false }
actix-service = "2.0.0-beta.3"
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.4"
futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
@@ -26,4 +26,5 @@ log = "0.4"
pin-project-lite = "0.2.0"
[dev-dependencies]
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }

View File

@@ -1,49 +1,30 @@
use core::cell::UnsafeCell;
use core::fmt;
use core::marker::PhantomData;
use core::task::Waker;
use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
/// A synchronization primitive for task wakeup.
///
/// Sometimes the task interested in a given event will change over time.
/// An `LocalWaker` can coordinate concurrent notifications with the consumer
/// potentially "updating" the underlying task to wake up. This is useful in
/// scenarios where a computation completes in another task and wants to
/// notify the consumer, but the consumer is in the process of being migrated to
/// a new logical task.
/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can
/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying
/// task to wake up. This is useful in scenarios where a computation completes in another task and
/// wants to notify the consumer, but the consumer is in the process of being migrated to a new
/// logical task.
///
/// Consumers should call `register` before checking the result of a computation
/// and producers should call `wake` after producing the computation (this
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `wake` to be called **before** `register`. This results in a no-op.
/// Consumers should call [`register`] before checking the result of a computation and producers
/// should call `wake` after producing the computation (this differs from the usual `thread::park`
/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in
/// a no-op.
///
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
/// `wake`.
// TODO: Refactor to Cell when remove deprecated methods (@botika)
/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`].
#[derive(Default)]
pub struct LocalWaker {
pub(crate) waker: UnsafeCell<Option<Waker>>,
pub(crate) waker: Cell<Option<Waker>>,
// mark LocalWaker as a !Send type.
_t: PhantomData<*const ()>,
_phantom: PhantomData<*const ()>,
}
impl LocalWaker {
/// Create an `LocalWaker`.
/// Creates a new, empty `LocalWaker`.
pub fn new() -> Self {
LocalWaker {
waker: UnsafeCell::new(None),
_t: PhantomData,
}
}
#[deprecated(
since = "2.1.0",
note = "In favor of `wake`. State of the register doesn't matter at `wake` up"
)]
/// Check if waker has been registered.
#[inline]
pub fn is_registered(&self) -> bool {
unsafe { (*self.waker.get()).is_some() }
LocalWaker::default()
}
/// Registers the waker to be notified on calls to `wake`.
@@ -51,11 +32,8 @@ impl LocalWaker {
/// Returns `true` if waker was registered before.
#[inline]
pub fn register(&self, waker: &Waker) -> bool {
unsafe {
let w = self.waker.get();
let last_waker = w.replace(Some(waker.clone()));
last_waker.is_some()
}
let last_waker = self.waker.replace(Some(waker.clone()));
last_waker.is_some()
}
/// Calls `wake` on the last `Waker` passed to `register`.
@@ -73,7 +51,7 @@ impl LocalWaker {
/// If a waker has not been registered, this returns `None`.
#[inline]
pub fn take(&self) -> Option<Waker> {
unsafe { (*self.waker.get()).take() }
self.waker.take()
}
}

View File

@@ -10,7 +10,7 @@ keywords = ["string", "bytes", "utf8", "web", "actix"]
categories = ["no-std", "web-programming"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/bytestring/"
documentation = "https://docs.rs/bytestring"
license = "MIT OR Apache-2.0"
edition = "2018"
@@ -24,4 +24,4 @@ serde = { version = "1.0", optional = true }
[dev-dependencies]
serde_json = "1.0"
ahash = { version = "0.6", default-features = false }
ahash = { version = "0.7", default-features = false }