mirror of
https://github.com/fafhrd91/actix-net
synced 2025-08-16 07:59:00 +02:00
Compare commits
21 Commits
server-v2.
...
rt-fix
Author | SHA1 | Date | |
---|---|---|---|
|
7c249e8834 | ||
|
789e6a8a46 | ||
|
6e590fd042 | ||
|
fa8ded3a34 | ||
|
841c611233 | ||
|
81a2b6a425 | ||
|
a6e79453d0 | ||
|
17f711a9d6 | ||
|
c3be839a69 | ||
|
fd59f21281 | ||
|
1c5a0a7c11 | ||
|
8d74cf387d | ||
|
7e483cc356 | ||
|
75d7ae3139 | ||
|
2cfe1d88ad | ||
|
cb07ead392 | ||
|
32543809f9 | ||
|
eb4d29e15e | ||
|
7ee42b50b4 | ||
|
0da848e4ae | ||
|
5f80d85010 |
103
.github/workflows/ci.yml
vendored
Normal file
103
.github/workflows/ci.yml
vendored
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
name: CI
|
||||||
|
|
||||||
|
on:
|
||||||
|
pull_request:
|
||||||
|
types: [opened, synchronize, reopened]
|
||||||
|
push:
|
||||||
|
branches: [master]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build_and_test:
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
target:
|
||||||
|
- { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu }
|
||||||
|
- { name: macOS, os: macos-latest, triple: x86_64-apple-darwin }
|
||||||
|
- { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc }
|
||||||
|
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
|
||||||
|
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
|
||||||
|
version:
|
||||||
|
- 1.46.0 # MSRV
|
||||||
|
- stable
|
||||||
|
- nightly
|
||||||
|
|
||||||
|
name: ${{ matrix.target.name }} / ${{ matrix.version }}
|
||||||
|
runs-on: ${{ matrix.target.os }}
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Setup Routing
|
||||||
|
if: matrix.target.os == 'macos-latest'
|
||||||
|
run: sudo ifconfig lo0 alias 127.0.0.3
|
||||||
|
|
||||||
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Install ${{ matrix.version }}
|
||||||
|
uses: actions-rs/toolchain@v1
|
||||||
|
with:
|
||||||
|
toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
|
||||||
|
profile: minimal
|
||||||
|
override: true
|
||||||
|
|
||||||
|
- name: Install MSYS2
|
||||||
|
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||||
|
uses: msys2/setup-msys2@v2
|
||||||
|
- name: Install MinGW Packages
|
||||||
|
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||||
|
run: |
|
||||||
|
msys2 -c 'pacman -Sy --noconfirm pacman'
|
||||||
|
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
||||||
|
|
||||||
|
- name: Generate Cargo.lock
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: generate-lockfile
|
||||||
|
- name: Cache Dependencies
|
||||||
|
uses: Swatinem/rust-cache@v1.2.0
|
||||||
|
|
||||||
|
- name: Install cargo-hack
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: install
|
||||||
|
args: cargo-hack
|
||||||
|
|
||||||
|
- name: check minimal
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: hack
|
||||||
|
args: --clean-per-run check --workspace --no-default-features --tests
|
||||||
|
|
||||||
|
- name: check full
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: check
|
||||||
|
args: --workspace --bins --examples --tests
|
||||||
|
|
||||||
|
- name: tests
|
||||||
|
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: test
|
||||||
|
args: --workspace --all-features --no-fail-fast -- --nocapture
|
||||||
|
|
||||||
|
- name: Generate coverage file
|
||||||
|
if: >
|
||||||
|
matrix.target.os == 'ubuntu-latest'
|
||||||
|
&& matrix.version == 'stable'
|
||||||
|
&& github.ref == 'refs/heads/master'
|
||||||
|
run: |
|
||||||
|
cargo install cargo-tarpaulin
|
||||||
|
cargo tarpaulin --out Xml --verbose
|
||||||
|
- name: Upload to Codecov
|
||||||
|
if: >
|
||||||
|
matrix.target.os == 'ubuntu-latest'
|
||||||
|
&& matrix.version == 'stable'
|
||||||
|
&& github.ref == 'refs/heads/master'
|
||||||
|
uses: codecov/codecov-action@v1
|
||||||
|
with:
|
||||||
|
file: cobertura.xml
|
||||||
|
|
||||||
|
- name: Clear the cargo caches
|
||||||
|
run: |
|
||||||
|
cargo install cargo-cache --no-default-features --features ci-autoclean
|
||||||
|
cargo-cache
|
28
.github/workflows/clippy-fmt.yml
vendored
28
.github/workflows/clippy-fmt.yml
vendored
@@ -1,34 +1,42 @@
|
|||||||
|
name: Lint
|
||||||
|
|
||||||
on:
|
on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, synchronize, reopened]
|
types: [opened, synchronize, reopened]
|
||||||
|
|
||||||
name: Clippy and rustfmt Check
|
|
||||||
jobs:
|
jobs:
|
||||||
clippy_check:
|
fmt:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
- uses: actions-rs/toolchain@v1
|
- name: Install Rust
|
||||||
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
toolchain: stable
|
toolchain: stable
|
||||||
components: rustfmt
|
|
||||||
profile: minimal
|
profile: minimal
|
||||||
|
components: rustfmt
|
||||||
override: true
|
override: true
|
||||||
- name: Check with rustfmt
|
- name: Rustfmt Check
|
||||||
uses: actions-rs/cargo@v1
|
uses: actions-rs/cargo@v1
|
||||||
with:
|
with:
|
||||||
command: fmt
|
command: fmt
|
||||||
args: --all -- --check
|
args: --all -- --check
|
||||||
|
|
||||||
- uses: actions-rs/toolchain@v1
|
clippy:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Install Rust
|
||||||
|
uses: actions-rs/toolchain@v1
|
||||||
with:
|
with:
|
||||||
toolchain: nightly
|
toolchain: stable
|
||||||
components: clippy
|
|
||||||
profile: minimal
|
profile: minimal
|
||||||
|
components: clippy
|
||||||
override: true
|
override: true
|
||||||
- name: Check with Clippy
|
- name: Clippy Check
|
||||||
uses: actions-rs/clippy-check@v1
|
uses: actions-rs/clippy-check@v1
|
||||||
with:
|
with:
|
||||||
token: ${{ secrets.GITHUB_TOKEN }}
|
token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
args: --workspace --tests
|
args: --workspace --tests --all-features
|
||||||
|
82
.github/workflows/linux.yml
vendored
82
.github/workflows/linux.yml
vendored
@@ -1,82 +0,0 @@
|
|||||||
name: CI (Linux)
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types: [opened, synchronize, reopened]
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
- '1.0'
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build_and_test:
|
|
||||||
strategy:
|
|
||||||
fail-fast: false
|
|
||||||
matrix:
|
|
||||||
version:
|
|
||||||
- 1.46.0
|
|
||||||
- stable
|
|
||||||
- nightly
|
|
||||||
|
|
||||||
name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Install ${{ matrix.version }}
|
|
||||||
uses: actions-rs/toolchain@v1
|
|
||||||
with:
|
|
||||||
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
|
|
||||||
profile: minimal
|
|
||||||
override: true
|
|
||||||
|
|
||||||
- name: Generate Cargo.lock
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: generate-lockfile
|
|
||||||
- name: Cache cargo dirs
|
|
||||||
uses: actions/cache@v2
|
|
||||||
with:
|
|
||||||
path:
|
|
||||||
~/.cargo/registry
|
|
||||||
~/.cargo/git
|
|
||||||
~/.cargo/bin
|
|
||||||
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
|
||||||
- name: Cache cargo build
|
|
||||||
uses: actions/cache@v2
|
|
||||||
with:
|
|
||||||
path: target
|
|
||||||
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
|
||||||
|
|
||||||
- name: check build
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: check
|
|
||||||
args: --workspace --bins --examples --tests
|
|
||||||
|
|
||||||
- name: tests
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
timeout-minutes: 40
|
|
||||||
with:
|
|
||||||
command: test
|
|
||||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
|
||||||
|
|
||||||
- name: Generate coverage file
|
|
||||||
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
|
||||||
run: |
|
|
||||||
cargo install cargo-tarpaulin
|
|
||||||
cargo tarpaulin --out Xml --workspace
|
|
||||||
|
|
||||||
- name: Upload to Codecov
|
|
||||||
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
|
||||||
uses: codecov/codecov-action@v1
|
|
||||||
with:
|
|
||||||
file: cobertura.xml
|
|
||||||
|
|
||||||
- name: Clear the cargo caches
|
|
||||||
run: |
|
|
||||||
rustup update stable
|
|
||||||
rustup override set stable
|
|
||||||
cargo install cargo-cache --no-default-features --features ci-autoclean
|
|
||||||
cargo-cache
|
|
43
.github/workflows/macos.yml
vendored
43
.github/workflows/macos.yml
vendored
@@ -1,43 +0,0 @@
|
|||||||
name: CI (macOS)
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types: [opened, synchronize, reopened]
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
- '1.0'
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build_and_test:
|
|
||||||
strategy:
|
|
||||||
fail-fast: false
|
|
||||||
matrix:
|
|
||||||
version:
|
|
||||||
- stable
|
|
||||||
- nightly
|
|
||||||
|
|
||||||
name: ${{ matrix.version }} - x86_64-apple-darwin
|
|
||||||
runs-on: macos-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Install ${{ matrix.version }}
|
|
||||||
uses: actions-rs/toolchain@v1
|
|
||||||
with:
|
|
||||||
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
|
|
||||||
profile: minimal
|
|
||||||
override: true
|
|
||||||
|
|
||||||
- name: check build
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: check
|
|
||||||
args: --workspace --bins --examples --tests
|
|
||||||
|
|
||||||
- name: tests
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: test
|
|
||||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
|
35
.github/workflows/upload-doc.yml
vendored
Normal file
35
.github/workflows/upload-doc.yml
vendored
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
name: Upload documentation
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [master]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Install Rust
|
||||||
|
uses: actions-rs/toolchain@v1
|
||||||
|
with:
|
||||||
|
toolchain: nightly-x86_64-unknown-linux-gnu
|
||||||
|
profile: minimal
|
||||||
|
override: true
|
||||||
|
|
||||||
|
- name: Build Docs
|
||||||
|
uses: actions-rs/cargo@v1
|
||||||
|
with:
|
||||||
|
command: doc
|
||||||
|
args: --workspace --all-features --no-deps
|
||||||
|
|
||||||
|
- name: Tweak HTML
|
||||||
|
run: echo '<meta http-equiv="refresh" content="0;url=actix_server/index.html">' > target/doc/index.html
|
||||||
|
|
||||||
|
- name: Deploy to GitHub Pages
|
||||||
|
uses: JamesIves/github-pages-deploy-action@3.7.1
|
||||||
|
with:
|
||||||
|
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
BRANCH: gh-pages
|
||||||
|
FOLDER: target/doc
|
45
.github/workflows/windows-mingw.yml
vendored
45
.github/workflows/windows-mingw.yml
vendored
@@ -1,45 +0,0 @@
|
|||||||
name: CI (Windows-mingw)
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types: [opened, synchronize, reopened]
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
- '1.0'
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build_and_test:
|
|
||||||
strategy:
|
|
||||||
fail-fast: false
|
|
||||||
matrix:
|
|
||||||
version:
|
|
||||||
- stable
|
|
||||||
- nightly
|
|
||||||
|
|
||||||
name: ${{ matrix.version }} - x86_64-pc-windows-gnu
|
|
||||||
runs-on: windows-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Install ${{ matrix.version }}
|
|
||||||
uses: actions-rs/toolchain@v1
|
|
||||||
with:
|
|
||||||
toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu
|
|
||||||
profile: minimal
|
|
||||||
override: true
|
|
||||||
|
|
||||||
- name: Install MSYS2
|
|
||||||
uses: msys2/setup-msys2@v2
|
|
||||||
|
|
||||||
- name: Install packages
|
|
||||||
run: |
|
|
||||||
msys2 -c 'pacman -Sy --noconfirm pacman'
|
|
||||||
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
|
||||||
|
|
||||||
- name: check build
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: check
|
|
||||||
args: --workspace --bins --examples --tests
|
|
69
.github/workflows/windows.yml
vendored
69
.github/workflows/windows.yml
vendored
@@ -1,69 +0,0 @@
|
|||||||
name: CI (Windows)
|
|
||||||
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types: [opened, synchronize, reopened]
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- master
|
|
||||||
- '1.0'
|
|
||||||
|
|
||||||
env:
|
|
||||||
VCPKGRS_DYNAMIC: 1
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
build_and_test:
|
|
||||||
strategy:
|
|
||||||
fail-fast: false
|
|
||||||
matrix:
|
|
||||||
version:
|
|
||||||
- stable
|
|
||||||
- nightly
|
|
||||||
target:
|
|
||||||
- x86_64-pc-windows-msvc
|
|
||||||
- i686-pc-windows-msvc
|
|
||||||
|
|
||||||
name: ${{ matrix.version }} - ${{ matrix.target }}
|
|
||||||
runs-on: windows-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v2
|
|
||||||
|
|
||||||
- name: Install ${{ matrix.version }}
|
|
||||||
uses: actions-rs/toolchain@v1
|
|
||||||
with:
|
|
||||||
toolchain: ${{ matrix.version }}-${{ matrix.target }}
|
|
||||||
profile: minimal
|
|
||||||
override: true
|
|
||||||
|
|
||||||
- name: Install OpenSSL (x64)
|
|
||||||
if: matrix.target == 'x86_64-pc-windows-msvc'
|
|
||||||
run: |
|
|
||||||
vcpkg integrate install
|
|
||||||
vcpkg install openssl:x64-windows
|
|
||||||
Get-ChildItem C:\vcpkg\installed\x64-windows\bin
|
|
||||||
Get-ChildItem C:\vcpkg\installed\x64-windows\lib
|
|
||||||
Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
|
|
||||||
Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
|
|
||||||
|
|
||||||
- name: Install OpenSSL (x86)
|
|
||||||
if: matrix.target == 'i686-pc-windows-msvc'
|
|
||||||
run: |
|
|
||||||
vcpkg integrate install
|
|
||||||
vcpkg install openssl:x86-windows
|
|
||||||
Get-ChildItem C:\vcpkg\installed\x86-windows\bin
|
|
||||||
Get-ChildItem C:\vcpkg\installed\x86-windows\lib
|
|
||||||
Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll
|
|
||||||
Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll
|
|
||||||
|
|
||||||
- name: check build
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: check
|
|
||||||
args: --workspace --bins --examples --tests
|
|
||||||
|
|
||||||
- name: tests
|
|
||||||
uses: actions-rs/cargo@v1
|
|
||||||
with:
|
|
||||||
command: test
|
|
||||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
|
@@ -1,6 +1,9 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 0.2.7 - 2021-02-06
|
||||||
* Add `Router::recognize_checked` [#247]
|
* Add `Router::recognize_checked` [#247]
|
||||||
|
|
||||||
[#247]: https://github.com/actix/actix-net/pull/247
|
[#247]: https://github.com/actix/actix-net/pull/247
|
||||||
|
@@ -1,9 +1,9 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-router"
|
name = "actix-router"
|
||||||
version = "0.2.6"
|
version = "0.2.7"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Resource path matching library"
|
description = "Resource path matching library"
|
||||||
keywords = ["actix"]
|
keywords = ["actix", "router", "routing"]
|
||||||
homepage = "https://actix.rs"
|
homepage = "https://actix.rs"
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
repository = "https://github.com/actix/actix-net.git"
|
||||||
documentation = "https://docs.rs/actix-router"
|
documentation = "https://docs.rs/actix-router"
|
||||||
|
@@ -670,8 +670,6 @@ pub(crate) fn insert_slash(path: &str) -> String {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use http::Uri;
|
|
||||||
use std::convert::TryFrom;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_parse_static() {
|
fn test_parse_static() {
|
||||||
@@ -833,8 +831,11 @@ mod tests {
|
|||||||
assert!(re.is_match("/user/2345/sdg"));
|
assert!(re.is_match("/user/2345/sdg"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "http")]
|
||||||
#[test]
|
#[test]
|
||||||
fn test_parse_urlencoded_param() {
|
fn test_parse_urlencoded_param() {
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
let re = ResourceDef::new("/user/{id}/test");
|
let re = ResourceDef::new("/user/{id}/test");
|
||||||
|
|
||||||
let mut path = Path::new("/user/2345/test");
|
let mut path = Path::new("/user/2345/test");
|
||||||
@@ -845,7 +846,7 @@ mod tests {
|
|||||||
assert!(re.match_path(&mut path));
|
assert!(re.match_path(&mut path));
|
||||||
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
||||||
|
|
||||||
let uri = Uri::try_from("/user/qwe%25/test").unwrap();
|
let uri = http::Uri::try_from("/user/qwe%25/test").unwrap();
|
||||||
let mut path = Path::new(uri);
|
let mut path = Path::new(uri);
|
||||||
assert!(re.match_path(&mut path));
|
assert!(re.match_path(&mut path));
|
||||||
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
||||||
|
@@ -1,6 +1,19 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
* Add `ActixStream` extension trait to include readiness methods. [#276]
|
||||||
|
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
|
||||||
|
|
||||||
|
[#276]: https://github.com/actix/actix-net/pull/276
|
||||||
|
[#282]: https://github.com/actix/actix-net/pull/282
|
||||||
|
|
||||||
|
|
||||||
|
## 2.0.2 - 2021-02-06
|
||||||
|
* Add `Arbiter::handle` to get a handle of an owned Arbiter. [#274]
|
||||||
|
* Add `System::try_current` for situations where actix may or may not be running a System. [#275]
|
||||||
|
|
||||||
|
[#274]: https://github.com/actix/actix-net/pull/274
|
||||||
|
[#275]: https://github.com/actix/actix-net/pull/275
|
||||||
|
|
||||||
|
|
||||||
## 2.0.1 - 2021-02-06
|
## 2.0.1 - 2021-02-06
|
||||||
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-rt"
|
name = "actix-rt"
|
||||||
version = "2.0.1"
|
version = "2.0.2"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"Rob Ede <robjtede@icloud.com>",
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
@@ -26,6 +26,7 @@ macros = ["actix-macros"]
|
|||||||
actix-macros = { version = "0.2.0", optional = true }
|
actix-macros = { version = "0.2.0", optional = true }
|
||||||
|
|
||||||
futures-core = { version = "0.3", default-features = false }
|
futures-core = { version = "0.3", default-features = false }
|
||||||
|
futures-intrusive = "0.4"
|
||||||
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@@ -9,6 +9,9 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
|
use futures_intrusive::channel::shared::{
|
||||||
|
oneshot_broadcast_channel, OneshotBroadcastReceiver, OneshotBroadcastSender,
|
||||||
|
};
|
||||||
use tokio::{sync::mpsc, task::LocalSet};
|
use tokio::{sync::mpsc, task::LocalSet};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -40,11 +43,26 @@ impl fmt::Debug for ArbiterCommand {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ArbiterHandle {
|
pub struct ArbiterHandle {
|
||||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
/// Is `None` for system arbiter.
|
||||||
|
stopped_rx: Option<OneshotBroadcastReceiver<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ArbiterHandle {
|
impl ArbiterHandle {
|
||||||
pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
|
pub(crate) fn new(
|
||||||
Self { tx }
|
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
stopped_rx: OneshotBroadcastReceiver<()>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
stopped_rx: Some(stopped_rx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn for_system(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
|
||||||
|
Self {
|
||||||
|
tx,
|
||||||
|
stopped_rx: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a future to the [Arbiter]'s thread and spawn it.
|
/// Send a future to the [Arbiter]'s thread and spawn it.
|
||||||
@@ -81,6 +99,25 @@ impl ArbiterHandle {
|
|||||||
pub fn stop(&self) -> bool {
|
pub fn stop(&self) -> bool {
|
||||||
self.tx.send(ArbiterCommand::Stop).is_ok()
|
self.tx.send(ArbiterCommand::Stop).is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Will wait for associated [Arbiter] to complete all commands up until it is stopped.
|
||||||
|
///
|
||||||
|
/// For [Arbiter]s that have already stopped, the future will resolve immediately.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if called on the system Arbiter. In this situation the Arbiter's lifetime is
|
||||||
|
/// implicitly bound by the main thread's lifetime.
|
||||||
|
pub async fn join(self) {
|
||||||
|
match self.stopped_rx {
|
||||||
|
Some(rx) => {
|
||||||
|
rx.receive().await;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// TODO: decide if this is correct
|
||||||
|
panic!("cannot wait on the system Arbiter's completion")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
||||||
@@ -89,8 +126,10 @@ impl ArbiterHandle {
|
|||||||
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Arbiter {
|
pub struct Arbiter {
|
||||||
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
id: usize,
|
||||||
thread_handle: thread::JoinHandle<()>,
|
stopped_tx: OneshotBroadcastSender<()>,
|
||||||
|
cmd_tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
thread_handle: Option<thread::JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Arbiter {
|
impl Arbiter {
|
||||||
@@ -99,7 +138,7 @@ impl Arbiter {
|
|||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
#[allow(clippy::new_without_default)]
|
#[allow(clippy::new_without_default)]
|
||||||
pub fn new() -> Arbiter {
|
pub fn new() -> ArbiterHandle {
|
||||||
Self::with_tokio_rt(|| {
|
Self::with_tokio_rt(|| {
|
||||||
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
||||||
})
|
})
|
||||||
@@ -109,76 +148,116 @@ impl Arbiter {
|
|||||||
///
|
///
|
||||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> ArbiterHandle
|
||||||
where
|
where
|
||||||
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
|
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
|
||||||
{
|
{
|
||||||
|
eprintln!("get sys current");
|
||||||
|
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
|
eprintln!("get sys id");
|
||||||
let system_id = sys.id();
|
let system_id = sys.id();
|
||||||
|
eprintln!("calc arb id");
|
||||||
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
// let ready_barrier = Arc::new(Barrier::new(2));
|
||||||
|
|
||||||
|
let (stopped_tx, stopped_rx) = oneshot_broadcast_channel::<()>();
|
||||||
|
|
||||||
|
eprintln!("make arb handle");
|
||||||
|
let hnd = ArbiterHandle::new(cmd_tx.clone(), stopped_rx);
|
||||||
|
|
||||||
|
eprintln!("make thread");
|
||||||
let thread_handle = thread::Builder::new()
|
let thread_handle = thread::Builder::new()
|
||||||
.name(name.clone())
|
.name(name.clone())
|
||||||
.spawn({
|
.spawn({
|
||||||
let tx = tx.clone();
|
let hnd = hnd.clone();
|
||||||
move || {
|
// let ready_barrier = Arc::clone(&ready_barrier);
|
||||||
let rt = Runtime::from(runtime_factory());
|
|
||||||
let hnd = ArbiterHandle::new(tx);
|
|
||||||
|
|
||||||
|
move || {
|
||||||
|
eprintln!("thread: make rt");
|
||||||
|
let rt = Runtime::from(runtime_factory());
|
||||||
|
|
||||||
|
eprintln!("thread: set sys");
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
|
|
||||||
|
// // wait until register message is sent
|
||||||
|
// eprintln!("thread: wait for arb registered");
|
||||||
|
// ready_barrier.wait();
|
||||||
|
|
||||||
|
eprintln!("thread: set arb handle");
|
||||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
// register arbiter
|
|
||||||
let _ = System::current()
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
|
|
||||||
|
|
||||||
ready_tx.send(()).unwrap();
|
|
||||||
|
|
||||||
// run arbiter event processing loop
|
// run arbiter event processing loop
|
||||||
rt.block_on(ArbiterRunner { rx });
|
eprintln!("thread: block on arbiter loop");
|
||||||
|
rt.block_on(ArbiterLoop { cmd_rx });
|
||||||
|
|
||||||
// deregister arbiter
|
// deregister arbiter
|
||||||
let _ = System::current()
|
eprintln!("thread: send deregister arbiter message");
|
||||||
|
System::current()
|
||||||
.tx()
|
.tx()
|
||||||
.send(SystemCommand::DeregisterArbiter(arb_id));
|
.send(SystemCommand::DeregisterArbiter(arb_id))
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
||||||
});
|
});
|
||||||
|
|
||||||
ready_rx.recv().unwrap();
|
let arb = Arbiter {
|
||||||
|
id: arb_id,
|
||||||
|
cmd_tx,
|
||||||
|
stopped_tx,
|
||||||
|
thread_handle: Some(thread_handle),
|
||||||
|
};
|
||||||
|
|
||||||
Arbiter { tx, thread_handle }
|
// register arbiter
|
||||||
|
eprintln!("send register arbiter message");
|
||||||
|
System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterArbiter(arb))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// eprintln!("inform arbiter that it is registered");
|
||||||
|
// ready_barrier.wait();
|
||||||
|
|
||||||
|
eprintln!("arbiter::new done");
|
||||||
|
hnd
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
||||||
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
|
pub(crate) fn for_system(local: &LocalSet) -> ArbiterHandle {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let hnd = ArbiterHandle::new(tx);
|
let hnd = ArbiterHandle::for_system(cmd_tx);
|
||||||
|
|
||||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
local.spawn_local(ArbiterRunner { rx });
|
local.spawn_local(ArbiterLoop { cmd_rx });
|
||||||
|
|
||||||
hnd
|
hnd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return `Arbiter`'s numeric ID.
|
||||||
|
pub(crate) fn id(&self) -> usize {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
|
||||||
|
// /// Return a handle to the this Arbiter's message sender.
|
||||||
|
// pub fn handle(&self) -> ArbiterHandle {
|
||||||
|
// ArbiterHandle::new(self.cmd_tx.clone())
|
||||||
|
// }
|
||||||
|
|
||||||
/// Return a handle to the current thread's Arbiter's message sender.
|
/// Return a handle to the current thread's Arbiter's message sender.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if no Arbiter is running on the current thread.
|
/// Panics if no Arbiter is running on the current thread.
|
||||||
pub fn current() -> ArbiterHandle {
|
pub fn current() -> ArbiterHandle {
|
||||||
HANDLE.with(|cell| match *cell.borrow() {
|
HANDLE.with(|cell| match *cell.borrow() {
|
||||||
Some(ref addr) => addr.clone(),
|
Some(ref hnd) => hnd.clone(),
|
||||||
None => panic!("Arbiter is not running."),
|
None => panic!("Arbiter is not running."),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -186,57 +265,68 @@ impl Arbiter {
|
|||||||
/// Stop Arbiter from continuing it's event loop.
|
/// Stop Arbiter from continuing it's event loop.
|
||||||
///
|
///
|
||||||
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
|
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
|
||||||
pub fn stop(&self) -> bool {
|
pub(crate) fn stop(&self) -> bool {
|
||||||
self.tx.send(ArbiterCommand::Stop).is_ok()
|
self.cmd_tx.send(ArbiterCommand::Stop).is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a future to the Arbiter's thread and spawn it.
|
// /// Send a future to the Arbiter's thread and spawn it.
|
||||||
///
|
// ///
|
||||||
/// If you require a result, include a response channel in the future.
|
// /// If you require a result, include a response channel in the future.
|
||||||
///
|
// ///
|
||||||
/// Returns true if future was sent successfully and false if the Arbiter has died.
|
// /// Returns true if future was sent successfully and false if the Arbiter has died.
|
||||||
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
// pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||||
where
|
// where
|
||||||
Fut: Future<Output = ()> + Send + 'static,
|
// Fut: Future<Output = ()> + Send + 'static,
|
||||||
{
|
// {
|
||||||
self.tx
|
// self.cmd_tx
|
||||||
.send(ArbiterCommand::Execute(Box::pin(future)))
|
// .send(ArbiterCommand::Execute(Box::pin(future)))
|
||||||
.is_ok()
|
// .is_ok()
|
||||||
}
|
// }
|
||||||
|
|
||||||
/// Send a function to the Arbiter's thread and execute it.
|
// /// Send a function to the Arbiter's thread and execute it.
|
||||||
///
|
// ///
|
||||||
/// Any result from the function is discarded. If you require a result, include a response
|
// /// Any result from the function is discarded. If you require a result, include a response
|
||||||
/// channel in the function.
|
// /// channel in the function.
|
||||||
///
|
// ///
|
||||||
/// Returns true if function was sent successfully and false if the Arbiter has died.
|
// /// Returns true if function was sent successfully and false if the Arbiter has died.
|
||||||
pub fn spawn_fn<F>(&self, f: F) -> bool
|
// pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||||
where
|
// where
|
||||||
F: FnOnce() + Send + 'static,
|
// F: FnOnce() + Send + 'static,
|
||||||
{
|
// {
|
||||||
self.spawn(async { f() })
|
// self.spawn(async { f() })
|
||||||
}
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
/// Wait for Arbiter's event loop to complete.
|
impl Drop for Arbiter {
|
||||||
///
|
fn drop(&mut self) {
|
||||||
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
|
eprintln!("Arb::drop: joining arbiter thread");
|
||||||
pub fn join(self) -> thread::Result<()> {
|
match self.thread_handle.take().unwrap().join() {
|
||||||
self.thread_handle.join()
|
Ok(()) => {}
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("arbiter {} thread panicked: {:?}", self.id(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
eprintln!("Arb::drop: sending stopped tx");
|
||||||
|
|
||||||
|
// could fail if all handles are dropped already so ignore result
|
||||||
|
let _ = self.stopped_tx.send(());
|
||||||
|
|
||||||
|
eprintln!("Arb::drop: done");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A persistent future that processes [Arbiter] commands.
|
/// A persistent future that processes [Arbiter] commands.
|
||||||
struct ArbiterRunner {
|
struct ArbiterLoop {
|
||||||
rx: mpsc::UnboundedReceiver<ArbiterCommand>,
|
cmd_rx: mpsc::UnboundedReceiver<ArbiterCommand>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for ArbiterRunner {
|
impl Future for ArbiterLoop {
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
// process all items currently buffered in channel
|
// process all items currently buffered in channel
|
||||||
loop {
|
loop {
|
||||||
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
||||||
// channel closed; no more messages can be received
|
// channel closed; no more messages can be received
|
||||||
None => return Poll::Ready(()),
|
None => return Poll::Ready(()),
|
||||||
|
|
||||||
|
@@ -16,12 +16,12 @@
|
|||||||
//!
|
//!
|
||||||
//! # Examples
|
//! # Examples
|
||||||
//! ```
|
//! ```
|
||||||
//! use std::sync::mpsc;
|
//! use std::sync::mpsc::channel as std_channel;
|
||||||
//! use actix_rt::{Arbiter, System};
|
//! use actix_rt::{Arbiter, System};
|
||||||
//!
|
//!
|
||||||
//! let _ = System::new();
|
//! let sys = System::new();
|
||||||
//!
|
//!
|
||||||
//! let (tx, rx) = mpsc::channel::<u32>();
|
//! let (tx, rx) = std_channel::<u32>();
|
||||||
//!
|
//!
|
||||||
//! let arbiter = Arbiter::new();
|
//! let arbiter = Arbiter::new();
|
||||||
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
|
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
|
||||||
@@ -30,7 +30,10 @@
|
|||||||
//! assert_eq!(num, 42);
|
//! assert_eq!(num, 42);
|
||||||
//!
|
//!
|
||||||
//! arbiter.stop();
|
//! arbiter.stop();
|
||||||
//! arbiter.join().unwrap();
|
//! sys.block_on(arbiter.join());
|
||||||
|
//!
|
||||||
|
//! System::current().stop();
|
||||||
|
//! sys.run().unwrap();
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
#![deny(rust_2018_idioms, nonstandard_style)]
|
#![deny(rust_2018_idioms, nonstandard_style)]
|
||||||
@@ -70,13 +73,50 @@ pub mod signal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub mod net {
|
pub mod net {
|
||||||
//! TCP/UDP/Unix bindings (Tokio re-exports).
|
//! TCP/UDP/Unix bindings (mostly Tokio re-exports).
|
||||||
|
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
pub use tokio::net::UdpSocket;
|
pub use tokio::net::UdpSocket;
|
||||||
pub use tokio::net::{TcpListener, TcpStream};
|
pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
||||||
|
|
||||||
|
/// Extension trait over async read+write types that can also signal readiness.
|
||||||
|
pub trait ActixStream: AsyncRead + AsyncWrite + Unpin + 'static {
|
||||||
|
/// Poll stream and check read readiness of Self.
|
||||||
|
///
|
||||||
|
/// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
|
||||||
|
|
||||||
|
/// Poll stream and check write readiness of Self.
|
||||||
|
///
|
||||||
|
/// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActixStream for TcpStream {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
TcpStream::poll_read_ready(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
TcpStream::poll_write_ready(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl ActixStream for UnixStream {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
UnixStream::poll_read_ready(self, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||||
|
UnixStream::poll_write_ready(self, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub mod time {
|
pub mod time {
|
||||||
|
@@ -54,14 +54,9 @@ impl System {
|
|||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let rt = Runtime::from(runtime_factory());
|
let rt = Runtime::from(runtime_factory());
|
||||||
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
let sys_arbiter = Arbiter::for_system(rt.local_set());
|
||||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
|
|
||||||
system
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// init background system arbiter
|
// init background system arbiter
|
||||||
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||||
rt.spawn(sys_ctrl);
|
rt.spawn(sys_ctrl);
|
||||||
@@ -100,6 +95,15 @@ impl System {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Try to get current running system.
|
||||||
|
///
|
||||||
|
/// Returns `None` if no System has been started.
|
||||||
|
///
|
||||||
|
/// Contrary to `current`, this never panics.
|
||||||
|
pub fn try_current() -> Option<System> {
|
||||||
|
CURRENT.with(|cell| cell.borrow().clone())
|
||||||
|
}
|
||||||
|
|
||||||
/// Get handle to a the System's initial [Arbiter].
|
/// Get handle to a the System's initial [Arbiter].
|
||||||
pub fn arbiter(&self) -> &ArbiterHandle {
|
pub fn arbiter(&self) -> &ArbiterHandle {
|
||||||
&self.arbiter_handle
|
&self.arbiter_handle
|
||||||
@@ -141,7 +145,10 @@ impl System {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
///
|
||||||
|
/// Dropping the `SystemRunner` (eg. `let _ = System::new();`) will result in no further events
|
||||||
|
/// being processed. It is required you bind the runner and call `run` or call `block_on`.
|
||||||
|
#[must_use = "A SystemRunner does nothing unless `run` or `block_on` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner {
|
pub struct SystemRunner {
|
||||||
rt: Runtime,
|
rt: Runtime,
|
||||||
@@ -181,7 +188,7 @@ impl SystemRunner {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum SystemCommand {
|
pub(crate) enum SystemCommand {
|
||||||
Exit(i32),
|
Exit(i32),
|
||||||
RegisterArbiter(usize, ArbiterHandle),
|
RegisterArbiter(Arbiter),
|
||||||
DeregisterArbiter(usize),
|
DeregisterArbiter(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,7 +198,7 @@ pub(crate) enum SystemCommand {
|
|||||||
pub(crate) struct SystemController {
|
pub(crate) struct SystemController {
|
||||||
stop_tx: Option<oneshot::Sender<i32>>,
|
stop_tx: Option<oneshot::Sender<i32>>,
|
||||||
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
||||||
arbiters: HashMap<usize, ArbiterHandle>,
|
arbiters: HashMap<usize, Arbiter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SystemController {
|
impl SystemController {
|
||||||
@@ -212,35 +219,42 @@ impl Future for SystemController {
|
|||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
// process all items currently buffered in channel
|
// process all items currently buffered in channel
|
||||||
loop {
|
let code = loop {
|
||||||
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
||||||
// channel closed; no more messages can be received
|
// channel closed; no more messages can be received
|
||||||
None => return Poll::Ready(()),
|
None => break 0,
|
||||||
|
|
||||||
// process system command
|
// process system command
|
||||||
Some(cmd) => match cmd {
|
Some(cmd) => match cmd {
|
||||||
SystemCommand::Exit(code) => {
|
SystemCommand::Exit(code) => {
|
||||||
// stop all arbiters
|
// stop all arbiters
|
||||||
for arb in self.arbiters.values() {
|
for arb in self.arbiters.values() {
|
||||||
|
eprintln!("SystemController: stopping arbiter {}", arb.id());
|
||||||
arb.stop();
|
arb.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop event loop
|
eprintln!("SystemController: dropping arbiters");
|
||||||
// will only fire once
|
// destroy all arbiters
|
||||||
if let Some(stop_tx) = self.stop_tx.take() {
|
// drop waits for threads to complete
|
||||||
let _ = stop_tx.send(code);
|
self.arbiters.clear();
|
||||||
}
|
|
||||||
|
break code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::RegisterArbiter(id, arb) => {
|
SystemCommand::RegisterArbiter(arb) => {
|
||||||
self.arbiters.insert(id, arb);
|
self.arbiters.insert(arb.id(), arb);
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::DeregisterArbiter(id) => {
|
SystemCommand::DeregisterArbiter(id) => {
|
||||||
self.arbiters.remove(&id);
|
// implicit arbiter drop
|
||||||
|
let _ = self.arbiters.remove(&id);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
|
self.stop_tx.take().unwrap().send(code).unwrap();
|
||||||
|
|
||||||
|
Poll::Ready(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
44
actix-rt/tests/multi_arbiter_check.rs
Normal file
44
actix-rt/tests/multi_arbiter_check.rs
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
//! Derived from this comment:
|
||||||
|
//! https://github.com/actix/actix/issues/464#issuecomment-779427825
|
||||||
|
|
||||||
|
use std::{thread, time::Duration};
|
||||||
|
|
||||||
|
use actix_rt::{Arbiter, System};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn actix_sample() {
|
||||||
|
let sys = System::new();
|
||||||
|
let arb = Arbiter::new();
|
||||||
|
|
||||||
|
let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
|
||||||
|
|
||||||
|
// create "actor"
|
||||||
|
arb.spawn_fn(move || {
|
||||||
|
let a = A;
|
||||||
|
|
||||||
|
actix_rt::spawn(async move {
|
||||||
|
while let Some(_) = rx.recv().await {
|
||||||
|
println!("{:?}", a);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
System::current().stop();
|
||||||
|
|
||||||
|
// all arbiters must be dropped when sys.run returns
|
||||||
|
sys.run().unwrap();
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct A;
|
||||||
|
|
||||||
|
impl Drop for A {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
println!("start drop");
|
||||||
|
thread::sleep(Duration::from_millis(200));
|
||||||
|
println!("finish drop");
|
||||||
|
}
|
||||||
|
}
|
120
actix-rt/tests/test_arbiter.rs
Normal file
120
actix-rt/tests/test_arbiter.rs
Normal file
@@ -0,0 +1,120 @@
|
|||||||
|
use std::{
|
||||||
|
sync::mpsc::channel as std_channel,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use actix_rt::{time, Arbiter, System};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn no_system_arbiter_new_panic() {
|
||||||
|
Arbiter::new();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_arbiter_wait_fut() {
|
||||||
|
let time = Duration::from_secs(1);
|
||||||
|
let instant = Instant::now();
|
||||||
|
|
||||||
|
System::new().block_on(async move {
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
|
arbiter.spawn(async move {
|
||||||
|
time::sleep(time).await;
|
||||||
|
Arbiter::current().stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
arbiter.join().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
instant.elapsed() >= time,
|
||||||
|
"Join on another arbiter should complete only when it calls stop"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_arbiter_wait_fn() {
|
||||||
|
let time = Duration::from_secs(1);
|
||||||
|
let instant = Instant::now();
|
||||||
|
|
||||||
|
System::new().block_on(async move {
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
|
arbiter.spawn_fn(move || {
|
||||||
|
actix_rt::spawn(async move {
|
||||||
|
time::sleep(time).await;
|
||||||
|
Arbiter::current().stop();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
arbiter.join().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
instant.elapsed() >= time,
|
||||||
|
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn join_arbiter_early_stop_call() {
|
||||||
|
let time = Duration::from_secs(1);
|
||||||
|
let instant = Instant::now();
|
||||||
|
|
||||||
|
System::new().block_on(async move {
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
|
arbiter.spawn(Box::pin(async move {
|
||||||
|
time::sleep(time).await;
|
||||||
|
Arbiter::current().stop();
|
||||||
|
}));
|
||||||
|
|
||||||
|
arbiter.stop();
|
||||||
|
arbiter.join().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
instant.elapsed() < time,
|
||||||
|
"Premature stop of Arbiter should conclude regardless of it's current state."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn arbiter_spawn_fn_runs() {
|
||||||
|
let sys = System::new();
|
||||||
|
|
||||||
|
let (tx, rx) = std_channel::<u32>();
|
||||||
|
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
arbiter.spawn_fn(move || {
|
||||||
|
tx.send(42).unwrap();
|
||||||
|
System::current().stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
let num = rx.recv().unwrap();
|
||||||
|
assert_eq!(num, 42);
|
||||||
|
|
||||||
|
sys.run().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn arbiter_inner_panic() {
|
||||||
|
let sys = System::new();
|
||||||
|
|
||||||
|
let (tx, rx) = std_channel::<u32>();
|
||||||
|
|
||||||
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
|
// spawned panics should not cause arbiter to crash
|
||||||
|
arbiter.spawn(async { panic!("inner panic; will be caught") });
|
||||||
|
arbiter.spawn_fn(|| panic!("inner panic; will be caught"));
|
||||||
|
|
||||||
|
arbiter.spawn(async move { tx.send(42).unwrap() });
|
||||||
|
|
||||||
|
let num = rx.recv().unwrap();
|
||||||
|
assert_eq!(num, 42);
|
||||||
|
|
||||||
|
System::current().stop();
|
||||||
|
sys.run().unwrap();
|
||||||
|
}
|
82
actix-rt/tests/test_runtime.rs
Normal file
82
actix-rt/tests/test_runtime.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
use std::{
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
mpsc::channel as std_channel,
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use actix_rt::{Arbiter, System};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn wait_for_spawns() {
|
||||||
|
let rt = actix_rt::Runtime::new().unwrap();
|
||||||
|
|
||||||
|
let handle = rt.spawn(async {
|
||||||
|
println!("running on the runtime");
|
||||||
|
// assertion panic is caught at task boundary
|
||||||
|
assert_eq!(1, 2);
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(rt.block_on(handle).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_system_with_tokio() {
|
||||||
|
let (tx, rx) = std_channel();
|
||||||
|
|
||||||
|
let res = System::with_tokio_rt(move || {
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.thread_keep_alive(Duration::from_millis(1000))
|
||||||
|
.worker_threads(2)
|
||||||
|
.max_blocking_threads(2)
|
||||||
|
.on_thread_start(|| {})
|
||||||
|
.on_thread_stop(|| {})
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.block_on(async {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
tx.send(42).unwrap();
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
123usize
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(res, 123);
|
||||||
|
assert_eq!(rx.recv().unwrap(), 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_arbiter_with_tokio() {
|
||||||
|
let sys = System::new();
|
||||||
|
|
||||||
|
let arb = Arbiter::with_tokio_rt(|| {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
let counter = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
|
let counter1 = counter.clone();
|
||||||
|
let did_spawn = arb.spawn(async move {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
counter1.store(false, Ordering::SeqCst);
|
||||||
|
Arbiter::current().stop();
|
||||||
|
System::current().stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
sys.run().unwrap();
|
||||||
|
|
||||||
|
assert!(did_spawn);
|
||||||
|
assert_eq!(false, counter.load(Ordering::SeqCst));
|
||||||
|
}
|
130
actix-rt/tests/test_system.rs
Normal file
130
actix-rt/tests/test_system.rs
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
use std::{
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
thread,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use actix_rt::{time, Arbiter, System};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn no_system_current_panic() {
|
||||||
|
System::current();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn try_current_no_system() {
|
||||||
|
assert!(System::try_current().is_none())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn try_current_with_system() {
|
||||||
|
System::new().block_on(async { assert!(System::try_current().is_some()) });
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn non_static_block_on() {
|
||||||
|
let string = String::from("test_str");
|
||||||
|
let string = string.as_str();
|
||||||
|
|
||||||
|
let sys = System::new();
|
||||||
|
|
||||||
|
sys.block_on(async {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
assert_eq!("test_str", string);
|
||||||
|
});
|
||||||
|
|
||||||
|
let rt = actix_rt::Runtime::new().unwrap();
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
assert_eq!("test_str", string);
|
||||||
|
});
|
||||||
|
|
||||||
|
System::current().stop();
|
||||||
|
sys.run().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn await_for_timer() {
|
||||||
|
let time = Duration::from_secs(1);
|
||||||
|
let instant = Instant::now();
|
||||||
|
|
||||||
|
System::new().block_on(async move {
|
||||||
|
time::sleep(time).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
instant.elapsed() >= time,
|
||||||
|
"Calling `block_on` should poll awaited future to completion."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn system_arbiter_spawn() {
|
||||||
|
let runner = System::new();
|
||||||
|
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let sys = System::current();
|
||||||
|
|
||||||
|
thread::spawn(|| {
|
||||||
|
// this thread will have no arbiter in it's thread local so call will panic
|
||||||
|
Arbiter::current();
|
||||||
|
})
|
||||||
|
.join()
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
let thread = thread::spawn(|| {
|
||||||
|
// this thread will have no arbiter in it's thread local so use the system handle instead
|
||||||
|
System::set_current(sys);
|
||||||
|
let sys = System::current();
|
||||||
|
|
||||||
|
let arb = sys.arbiter();
|
||||||
|
arb.spawn(async move {
|
||||||
|
tx.send(42u32).unwrap();
|
||||||
|
System::current().stop();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Atom(Arc<AtomicBool>);
|
||||||
|
|
||||||
|
impl Drop for Atom {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn system_stop_arbiter_join_barrier() {
|
||||||
|
let sys = System::new();
|
||||||
|
let arb = Arbiter::new();
|
||||||
|
|
||||||
|
let atom = Atom(Arc::new(AtomicBool::new(false)));
|
||||||
|
|
||||||
|
// arbiter should be alive to receive spawn msg
|
||||||
|
assert!(Arbiter::current().spawn_fn(|| {}));
|
||||||
|
assert!(arb.spawn_fn(move || {
|
||||||
|
// thread should get dropped during sleep
|
||||||
|
thread::sleep(Duration::from_secs(2));
|
||||||
|
|
||||||
|
// pointless load to move atom into scope
|
||||||
|
atom.0.load(Ordering::SeqCst);
|
||||||
|
|
||||||
|
panic!("spawned fn (thread) should be dropped during sleep");
|
||||||
|
}));
|
||||||
|
|
||||||
|
System::current().stop();
|
||||||
|
sys.run().unwrap();
|
||||||
|
|
||||||
|
// arbiter should be dead and return false
|
||||||
|
assert!(!Arbiter::current().spawn_fn(|| {}));
|
||||||
|
assert!(!arb.spawn_fn(|| {}));
|
||||||
|
}
|
@@ -1,268 +0,0 @@
|
|||||||
use std::{
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
mpsc::channel,
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
thread,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
|
||||||
|
|
||||||
use actix_rt::{Arbiter, System};
|
|
||||||
use tokio::sync::oneshot;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn await_for_timer() {
|
|
||||||
let time = Duration::from_secs(1);
|
|
||||||
let instant = Instant::now();
|
|
||||||
System::new().block_on(async move {
|
|
||||||
tokio::time::sleep(time).await;
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() >= time,
|
|
||||||
"Block on should poll awaited future to completion"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn join_another_arbiter() {
|
|
||||||
let time = Duration::from_secs(1);
|
|
||||||
let instant = Instant::now();
|
|
||||||
System::new().block_on(async move {
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn(Box::pin(async move {
|
|
||||||
tokio::time::sleep(time).await;
|
|
||||||
Arbiter::current().stop();
|
|
||||||
}));
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() >= time,
|
|
||||||
"Join on another arbiter should complete only when it calls stop"
|
|
||||||
);
|
|
||||||
|
|
||||||
let instant = Instant::now();
|
|
||||||
System::new().block_on(async move {
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn_fn(move || {
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
tokio::time::sleep(time).await;
|
|
||||||
Arbiter::current().stop();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() >= time,
|
|
||||||
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
|
|
||||||
);
|
|
||||||
|
|
||||||
let instant = Instant::now();
|
|
||||||
System::new().block_on(async move {
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn(Box::pin(async move {
|
|
||||||
tokio::time::sleep(time).await;
|
|
||||||
Arbiter::current().stop();
|
|
||||||
}));
|
|
||||||
arbiter.stop();
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() < time,
|
|
||||||
"Premature stop of arbiter should conclude regardless of it's current state"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn non_static_block_on() {
|
|
||||||
let string = String::from("test_str");
|
|
||||||
let string = string.as_str();
|
|
||||||
|
|
||||||
let sys = System::new();
|
|
||||||
|
|
||||||
sys.block_on(async {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
assert_eq!("test_str", string);
|
|
||||||
});
|
|
||||||
|
|
||||||
let rt = actix_rt::Runtime::new().unwrap();
|
|
||||||
|
|
||||||
rt.block_on(async {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
assert_eq!("test_str", string);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn wait_for_spawns() {
|
|
||||||
let rt = actix_rt::Runtime::new().unwrap();
|
|
||||||
|
|
||||||
let handle = rt.spawn(async {
|
|
||||||
println!("running on the runtime");
|
|
||||||
// assertion panic is caught at task boundary
|
|
||||||
assert_eq!(1, 2);
|
|
||||||
});
|
|
||||||
|
|
||||||
assert!(rt.block_on(handle).is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn arbiter_spawn_fn_runs() {
|
|
||||||
let _ = System::new();
|
|
||||||
|
|
||||||
let (tx, rx) = channel::<u32>();
|
|
||||||
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn_fn(move || tx.send(42).unwrap());
|
|
||||||
|
|
||||||
let num = rx.recv().unwrap();
|
|
||||||
assert_eq!(num, 42);
|
|
||||||
|
|
||||||
arbiter.stop();
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn arbiter_drop_no_panic_fn() {
|
|
||||||
let _ = System::new();
|
|
||||||
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn_fn(|| panic!("test"));
|
|
||||||
|
|
||||||
arbiter.stop();
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn arbiter_drop_no_panic_fut() {
|
|
||||||
let _ = System::new();
|
|
||||||
|
|
||||||
let arbiter = Arbiter::new();
|
|
||||||
arbiter.spawn(async { panic!("test") });
|
|
||||||
|
|
||||||
arbiter.stop();
|
|
||||||
arbiter.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[should_panic]
|
|
||||||
fn no_system_current_panic() {
|
|
||||||
System::current();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[should_panic]
|
|
||||||
fn no_system_arbiter_new_panic() {
|
|
||||||
Arbiter::new();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn system_arbiter_spawn() {
|
|
||||||
let runner = System::new();
|
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
let sys = System::current();
|
|
||||||
|
|
||||||
thread::spawn(|| {
|
|
||||||
// this thread will have no arbiter in it's thread local so call will panic
|
|
||||||
Arbiter::current();
|
|
||||||
})
|
|
||||||
.join()
|
|
||||||
.unwrap_err();
|
|
||||||
|
|
||||||
let thread = thread::spawn(|| {
|
|
||||||
// this thread will have no arbiter in it's thread local so use the system handle instead
|
|
||||||
System::set_current(sys);
|
|
||||||
let sys = System::current();
|
|
||||||
|
|
||||||
let arb = sys.arbiter();
|
|
||||||
arb.spawn(async move {
|
|
||||||
tx.send(42u32).unwrap();
|
|
||||||
System::current().stop();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
|
||||||
thread.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn system_stop_stops_arbiters() {
|
|
||||||
let sys = System::new();
|
|
||||||
let arb = Arbiter::new();
|
|
||||||
|
|
||||||
// arbiter should be alive to receive spawn msg
|
|
||||||
assert!(Arbiter::current().spawn_fn(|| {}));
|
|
||||||
assert!(arb.spawn_fn(|| {}));
|
|
||||||
|
|
||||||
System::current().stop();
|
|
||||||
sys.run().unwrap();
|
|
||||||
|
|
||||||
// account for slightly slow thread de-spawns (only observed on windows)
|
|
||||||
thread::sleep(Duration::from_millis(100));
|
|
||||||
|
|
||||||
// arbiter should be dead and return false
|
|
||||||
assert!(!Arbiter::current().spawn_fn(|| {}));
|
|
||||||
assert!(!arb.spawn_fn(|| {}));
|
|
||||||
|
|
||||||
arb.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn new_system_with_tokio() {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
|
|
||||||
let res = System::with_tokio_rt(move || {
|
|
||||||
tokio::runtime::Builder::new_multi_thread()
|
|
||||||
.enable_io()
|
|
||||||
.enable_time()
|
|
||||||
.thread_keep_alive(Duration::from_millis(1000))
|
|
||||||
.worker_threads(2)
|
|
||||||
.max_blocking_threads(2)
|
|
||||||
.on_thread_start(|| {})
|
|
||||||
.on_thread_stop(|| {})
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
})
|
|
||||||
.block_on(async {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
|
|
||||||
tokio::task::spawn(async move {
|
|
||||||
tx.send(42).unwrap();
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
123usize
|
|
||||||
});
|
|
||||||
|
|
||||||
assert_eq!(res, 123);
|
|
||||||
assert_eq!(rx.recv().unwrap(), 42);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn new_arbiter_with_tokio() {
|
|
||||||
let _ = System::new();
|
|
||||||
|
|
||||||
let arb = Arbiter::with_tokio_rt(|| {
|
|
||||||
tokio::runtime::Builder::new_current_thread()
|
|
||||||
.enable_all()
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
let counter = Arc::new(AtomicBool::new(true));
|
|
||||||
|
|
||||||
let counter1 = counter.clone();
|
|
||||||
let did_spawn = arb.spawn(async move {
|
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
|
||||||
counter1.store(false, Ordering::SeqCst);
|
|
||||||
Arbiter::current().stop();
|
|
||||||
});
|
|
||||||
|
|
||||||
assert!(did_spawn);
|
|
||||||
|
|
||||||
arb.join().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(false, counter.load(Ordering::SeqCst));
|
|
||||||
}
|
|
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actix-server"
|
name = "actix-server"
|
||||||
version = "2.0.0-beta.2"
|
version = "2.0.0-beta.3"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"fakeshadow <24548779@qq.com>",
|
"fakeshadow <24548779@qq.com>",
|
||||||
@@ -12,7 +12,6 @@ repository = "https://github.com/actix/actix-net.git"
|
|||||||
documentation = "https://docs.rs/actix-server"
|
documentation = "https://docs.rs/actix-server"
|
||||||
categories = ["network-programming", "asynchronous"]
|
categories = ["network-programming", "asynchronous"]
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
exclude = [".gitignore", ".cargo/config"]
|
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
@@ -33,7 +32,7 @@ log = "0.4"
|
|||||||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
||||||
num_cpus = "1.13"
|
num_cpus = "1.13"
|
||||||
slab = "0.4"
|
slab = "0.4"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1.2", features = ["sync"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.0.0"
|
||||||
|
@@ -8,7 +8,7 @@ use mio::{Registry, Token as MioToken, Waker};
|
|||||||
|
|
||||||
use crate::worker::WorkerHandle;
|
use crate::worker::WorkerHandle;
|
||||||
|
|
||||||
/// waker token for `mio::Poll` instance
|
/// Waker token for `mio::Poll` instance.
|
||||||
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
|
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
|
||||||
|
|
||||||
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
|
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
|
||||||
@@ -30,7 +30,7 @@ impl Deref for WakerQueue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl WakerQueue {
|
impl WakerQueue {
|
||||||
/// construct a waker queue with given `Poll`'s `Registry` and capacity.
|
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
|
||||||
///
|
///
|
||||||
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
|
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
|
||||||
/// event's token for it to properly handle `WakerInterest`.
|
/// event's token for it to properly handle `WakerInterest`.
|
||||||
@@ -41,7 +41,7 @@ impl WakerQueue {
|
|||||||
Ok(Self(Arc::new((waker, queue))))
|
Ok(Self(Arc::new((waker, queue))))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// push a new interest to the queue and wake up the accept poll afterwards.
|
/// Push a new interest to the queue and wake up the accept poll afterwards.
|
||||||
pub(crate) fn wake(&self, interest: WakerInterest) {
|
pub(crate) fn wake(&self, interest: WakerInterest) {
|
||||||
let (waker, queue) = self.deref();
|
let (waker, queue) = self.deref();
|
||||||
|
|
||||||
@@ -55,20 +55,20 @@ impl WakerQueue {
|
|||||||
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
|
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// get a MutexGuard of the waker queue.
|
/// Get a MutexGuard of the waker queue.
|
||||||
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
|
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
|
||||||
self.deref().1.lock().expect("Failed to lock WakerQueue")
|
self.deref().1.lock().expect("Failed to lock WakerQueue")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// reset the waker queue so it does not grow infinitely.
|
/// Reset the waker queue so it does not grow infinitely.
|
||||||
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
|
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
|
||||||
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
|
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
|
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
|
||||||
///
|
///
|
||||||
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
|
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
|
||||||
pub(crate) enum WakerInterest {
|
pub(crate) enum WakerInterest {
|
||||||
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
|
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
|
||||||
/// available and can accept new tasks.
|
/// available and can accept new tasks.
|
||||||
|
@@ -102,8 +102,8 @@ pub trait Service<Req> {
|
|||||||
/// call and the next invocation of `call` results in an error.
|
/// call and the next invocation of `call` results in an error.
|
||||||
///
|
///
|
||||||
/// # Notes
|
/// # Notes
|
||||||
/// 1. `.poll_ready()` might be called on different task from actual service call.
|
/// 1. `poll_ready` might be called on a different task to `call`.
|
||||||
/// 1. In case of chained services, `.poll_ready()` get called for all services at once.
|
/// 1. In cases of chained services, `.poll_ready()` is called for all services at once.
|
||||||
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
|
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
|
||||||
|
|
||||||
/// Process the request and return the response asynchronously.
|
/// Process the request and return the response asynchronously.
|
||||||
|
@@ -1,6 +1,11 @@
|
|||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
* Rename `accept::openssl::{SslStream => TlsStream}`.
|
||||||
|
* Add `connect::Connect::set_local_addr` to attach local `Ipaddr`. [#282]
|
||||||
|
* `connector::TcpConnector` service would try to bind to local_addr of `IpAddr` when given [#282]
|
||||||
|
|
||||||
|
[#282]: https://github.com/actix/actix-net/pull/282
|
||||||
|
|
||||||
|
|
||||||
## 3.0.0-beta.3 - 2021-02-06
|
## 3.0.0-beta.3 - 2021-02-06
|
||||||
|
@@ -52,7 +52,7 @@ log = "0.4"
|
|||||||
tokio-util = { version = "0.6.3", default-features = false }
|
tokio-util = { version = "0.6.3", default-features = false }
|
||||||
|
|
||||||
# openssl
|
# openssl
|
||||||
tls-openssl = { package = "openssl", version = "0.10", optional = true }
|
tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
|
||||||
tokio-openssl = { version = "0.6", optional = true }
|
tokio-openssl = { version = "0.6", optional = true }
|
||||||
|
|
||||||
# rustls
|
# rustls
|
||||||
@@ -62,9 +62,15 @@ webpki-roots = { version = "0.21", optional = true }
|
|||||||
# native-tls
|
# native-tls
|
||||||
tokio-native-tls = { version = "0.3", optional = true }
|
tokio-native-tls = { version = "0.3", optional = true }
|
||||||
|
|
||||||
|
[target.'cfg(windows)'.dependencies.tls-openssl]
|
||||||
|
version = "0.10.9"
|
||||||
|
package = "openssl"
|
||||||
|
features = ["vendored"]
|
||||||
|
optional = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.0.0"
|
actix-rt = "2.0.0"
|
||||||
actix-server = "2.0.0-beta.2"
|
actix-server = "2.0.0-beta.3"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||||
|
@@ -29,9 +29,10 @@ use std::{
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use actix_rt::net::TcpStream;
|
||||||
use actix_server::Server;
|
use actix_server::Server;
|
||||||
use actix_service::pipeline_factory;
|
use actix_service::pipeline_factory;
|
||||||
use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
|
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
||||||
use futures_util::future::ok;
|
use futures_util::future::ok;
|
||||||
use log::info;
|
use log::info;
|
||||||
use rustls::{
|
use rustls::{
|
||||||
@@ -74,9 +75,9 @@ async fn main() -> io::Result<()> {
|
|||||||
// Set up TLS service factory
|
// Set up TLS service factory
|
||||||
pipeline_factory(tls_acceptor.clone())
|
pipeline_factory(tls_acceptor.clone())
|
||||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||||
.and_then(move |stream| {
|
.and_then(move |stream: TlsStream<TcpStream>| {
|
||||||
let num = count.fetch_add(1, Ordering::Relaxed);
|
let num = count.fetch_add(1, Ordering::Relaxed);
|
||||||
info!("[{}] Got TLS connection: {:?}", num, stream);
|
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||||
ok(())
|
ok(())
|
||||||
})
|
})
|
||||||
})?
|
})?
|
||||||
|
@@ -1,15 +1,94 @@
|
|||||||
use std::task::{Context, Poll};
|
use std::{
|
||||||
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::Counter;
|
use actix_utils::counter::Counter;
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
|
|
||||||
pub use tokio_native_tls::native_tls::Error;
|
pub use tokio_native_tls::native_tls::Error;
|
||||||
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
|
pub use tokio_native_tls::TlsAcceptor;
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_native_tls::TlsStream<T>> for TlsStream<T> {
|
||||||
|
fn from(stream: tokio_native_tls::TlsStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> Deref for TlsStream<T> {
|
||||||
|
type Target = tokio_native_tls::TlsStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> DerefMut for TlsStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `native-tls` package.
|
/// Accept TLS connections via `native-tls` package.
|
||||||
///
|
///
|
||||||
/// `native-tls` feature enables this `Acceptor` type.
|
/// `native-tls` feature enables this `Acceptor` type.
|
||||||
@@ -34,10 +113,7 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@@ -71,10 +147,7 @@ impl Clone for NativeTlsAcceptorService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for NativeTlsAcceptorService
|
impl<T: ActixStream> Service<T> for NativeTlsAcceptorService {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
||||||
@@ -93,7 +166,7 @@ where
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let io = this.acceptor.accept(io).await;
|
let io = this.acceptor.accept(io).await;
|
||||||
drop(guard);
|
drop(guard);
|
||||||
io
|
io.map(Into::into)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,10 +1,13 @@
|
|||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
@@ -12,10 +15,82 @@ use futures_core::{future::LocalBoxFuture, ready};
|
|||||||
pub use openssl::ssl::{
|
pub use openssl::ssl::{
|
||||||
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
||||||
};
|
};
|
||||||
pub use tokio_openssl::SslStream;
|
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_openssl::SslStream<T>> for TlsStream<T> {
|
||||||
|
fn from(stream: tokio_openssl::SslStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for TlsStream<T> {
|
||||||
|
type Target = tokio_openssl::SslStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for TlsStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref(), cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref(), cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `openssl` package.
|
/// Accept TLS connections via `openssl` package.
|
||||||
///
|
///
|
||||||
/// `openssl` feature enables this `Acceptor` type.
|
/// `openssl` feature enables this `Acceptor` type.
|
||||||
@@ -40,11 +115,8 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
type Response = TlsStream<T>;
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = SslStream<T>;
|
|
||||||
type Error = SslError;
|
type Error = SslError;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
type Service = AcceptorService;
|
type Service = AcceptorService;
|
||||||
@@ -67,11 +139,8 @@ pub struct AcceptorService {
|
|||||||
conns: Counter,
|
conns: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for AcceptorService
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
where
|
type Response = TlsStream<T>;
|
||||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
|
||||||
{
|
|
||||||
type Response = SslStream<T>;
|
|
||||||
type Error = SslError;
|
type Error = SslError;
|
||||||
type Future = AcceptorServiceResponse<T>;
|
type Future = AcceptorServiceResponse<T>;
|
||||||
|
|
||||||
@@ -88,24 +157,25 @@ where
|
|||||||
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
||||||
AcceptorServiceResponse {
|
AcceptorServiceResponse {
|
||||||
_guard: self.conns.get(),
|
_guard: self.conns.get(),
|
||||||
stream: Some(SslStream::new(ssl, io).unwrap()),
|
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceResponse<T>
|
pub struct AcceptorServiceResponse<T: ActixStream> {
|
||||||
where
|
stream: Option<tokio_openssl::SslStream<T>>,
|
||||||
T: AsyncRead + AsyncWrite,
|
|
||||||
{
|
|
||||||
stream: Option<SslStream<T>>,
|
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
|
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
|
||||||
type Output = Result<SslStream<T>, SslError>;
|
type Output = Result<TlsStream<T>, SslError>;
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
||||||
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
|
Poll::Ready(Ok(self
|
||||||
|
.stream
|
||||||
|
.take()
|
||||||
|
.expect("SSL connect has resolved.")
|
||||||
|
.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,22 +1,96 @@
|
|||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
io,
|
io::{self, IoSlice},
|
||||||
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
use actix_rt::net::ActixStream;
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use actix_utils::counter::{Counter, CounterGuard};
|
use actix_utils::counter::{Counter, CounterGuard};
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
use tokio_rustls::{Accept, TlsAcceptor};
|
use tokio_rustls::{Accept, TlsAcceptor};
|
||||||
|
|
||||||
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
||||||
pub use tokio_rustls::server::TlsStream;
|
|
||||||
|
|
||||||
use super::MAX_CONN_COUNTER;
|
use super::MAX_CONN_COUNTER;
|
||||||
|
|
||||||
|
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||||
|
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
|
||||||
|
|
||||||
|
impl<T> From<tokio_rustls::server::TlsStream<T>> for TlsStream<T> {
|
||||||
|
fn from(stream: tokio_rustls::server::TlsStream<T>) -> Self {
|
||||||
|
Self(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for TlsStream<T> {
|
||||||
|
type Target = tokio_rustls::server::TlsStream<T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for TlsStream<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut ReadBuf<'_>,
|
||||||
|
) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||||
|
fn poll_write(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_vectored(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
bufs: &[IoSlice<'_>],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_write_vectored(&self) -> bool {
|
||||||
|
(&**self).is_write_vectored()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||||
|
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_read_ready((&**self).get_ref().0, cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
T::poll_write_ready((&**self).get_ref().0, cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Accept TLS connections via `rustls` package.
|
/// Accept TLS connections via `rustls` package.
|
||||||
///
|
///
|
||||||
/// `rustls` feature enables this `Acceptor` type.
|
/// `rustls` feature enables this `Acceptor` type.
|
||||||
@@ -43,10 +117,7 @@ impl Clone for Acceptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> ServiceFactory<T> for Acceptor
|
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
type Config = ();
|
type Config = ();
|
||||||
@@ -72,10 +143,7 @@ pub struct AcceptorService {
|
|||||||
conns: Counter,
|
conns: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Service<T> for AcceptorService
|
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Response = TlsStream<T>;
|
type Response = TlsStream<T>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
type Future = AcceptorServiceFut<T>;
|
type Future = AcceptorServiceFut<T>;
|
||||||
@@ -96,22 +164,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct AcceptorServiceFut<T>
|
pub struct AcceptorServiceFut<T: ActixStream> {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
fut: Accept<T>,
|
fut: Accept<T>,
|
||||||
_guard: CounterGuard,
|
_guard: CounterGuard,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Future for AcceptorServiceFut<T>
|
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
|
||||||
where
|
|
||||||
T: AsyncRead + AsyncWrite + Unpin,
|
|
||||||
{
|
|
||||||
type Output = Result<TlsStream<T>, io::Error>;
|
type Output = Result<TlsStream<T>, io::Error>;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
let this = self.get_mut();
|
let this = self.get_mut();
|
||||||
Pin::new(&mut this.fut).poll(cx)
|
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,7 @@ use std::{
|
|||||||
fmt,
|
fmt,
|
||||||
iter::{self, FromIterator as _},
|
iter::{self, FromIterator as _},
|
||||||
mem,
|
mem,
|
||||||
net::SocketAddr,
|
net::{IpAddr, SocketAddr},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Parse a host into parts (hostname and port).
|
/// Parse a host into parts (hostname and port).
|
||||||
@@ -67,6 +67,7 @@ pub struct Connect<T> {
|
|||||||
pub(crate) req: T,
|
pub(crate) req: T,
|
||||||
pub(crate) port: u16,
|
pub(crate) port: u16,
|
||||||
pub(crate) addr: ConnectAddrs,
|
pub(crate) addr: ConnectAddrs,
|
||||||
|
pub(crate) local_addr: Option<IpAddr>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> Connect<T> {
|
impl<T: Address> Connect<T> {
|
||||||
@@ -78,6 +79,7 @@ impl<T: Address> Connect<T> {
|
|||||||
req,
|
req,
|
||||||
port: port.unwrap_or(0),
|
port: port.unwrap_or(0),
|
||||||
addr: ConnectAddrs::None,
|
addr: ConnectAddrs::None,
|
||||||
|
local_addr: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,6 +90,7 @@ impl<T: Address> Connect<T> {
|
|||||||
req,
|
req,
|
||||||
port: 0,
|
port: 0,
|
||||||
addr: ConnectAddrs::One(addr),
|
addr: ConnectAddrs::One(addr),
|
||||||
|
local_addr: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -119,6 +122,12 @@ impl<T: Address> Connect<T> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set local_addr of connect.
|
||||||
|
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
|
||||||
|
self.local_addr = Some(addr.into());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Get hostname.
|
/// Get hostname.
|
||||||
pub fn hostname(&self) -> &str {
|
pub fn hostname(&self) -> &str {
|
||||||
self.req.hostname()
|
self.req.hostname()
|
||||||
@@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option<u16>) {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::Ipv4Addr;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
@@ -329,4 +338,13 @@ mod tests {
|
|||||||
let mut iter = ConnectAddrsIter::None;
|
let mut iter = ConnectAddrsIter::None;
|
||||||
assert_eq!(iter.next(), None);
|
assert_eq!(iter.next(), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_local_addr() {
|
||||||
|
let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]);
|
||||||
|
assert_eq!(
|
||||||
|
conn.local_addr.unwrap(),
|
||||||
|
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -2,12 +2,12 @@ use std::{
|
|||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
future::Future,
|
future::Future,
|
||||||
io,
|
io,
|
||||||
net::SocketAddr,
|
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
use actix_rt::net::{TcpSocket, TcpStream};
|
||||||
use actix_service::{Service, ServiceFactory};
|
use actix_service::{Service, ServiceFactory};
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
use log::{error, trace};
|
use log::{error, trace};
|
||||||
@@ -54,9 +54,14 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
|
|||||||
|
|
||||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||||
let port = req.port();
|
let port = req.port();
|
||||||
let Connect { req, addr, .. } = req;
|
let Connect {
|
||||||
|
req,
|
||||||
|
addr,
|
||||||
|
local_addr,
|
||||||
|
..
|
||||||
|
} = req;
|
||||||
|
|
||||||
TcpConnectorResponse::new(req, port, addr)
|
TcpConnectorResponse::new(req, port, local_addr, addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,6 +70,7 @@ pub enum TcpConnectorResponse<T> {
|
|||||||
Response {
|
Response {
|
||||||
req: Option<T>,
|
req: Option<T>,
|
||||||
port: u16,
|
port: u16,
|
||||||
|
local_addr: Option<IpAddr>,
|
||||||
addrs: Option<VecDeque<SocketAddr>>,
|
addrs: Option<VecDeque<SocketAddr>>,
|
||||||
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
|
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
|
||||||
},
|
},
|
||||||
@@ -72,7 +78,12 @@ pub enum TcpConnectorResponse<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Address> TcpConnectorResponse<T> {
|
impl<T: Address> TcpConnectorResponse<T> {
|
||||||
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> {
|
pub(crate) fn new(
|
||||||
|
req: T,
|
||||||
|
port: u16,
|
||||||
|
local_addr: Option<IpAddr>,
|
||||||
|
addr: ConnectAddrs,
|
||||||
|
) -> TcpConnectorResponse<T> {
|
||||||
if addr.is_none() {
|
if addr.is_none() {
|
||||||
error!("TCP connector: unresolved connection address");
|
error!("TCP connector: unresolved connection address");
|
||||||
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
|
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
|
||||||
@@ -90,8 +101,9 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||||||
ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
|
ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
port,
|
port,
|
||||||
|
local_addr,
|
||||||
addrs: None,
|
addrs: None,
|
||||||
stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
|
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
|
||||||
},
|
},
|
||||||
|
|
||||||
// when resolver returns multiple socket addr for request they would be popped from
|
// when resolver returns multiple socket addr for request they would be popped from
|
||||||
@@ -99,6 +111,7 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||||||
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
|
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
|
||||||
req: Some(req),
|
req: Some(req),
|
||||||
port,
|
port,
|
||||||
|
local_addr,
|
||||||
addrs: Some(addrs),
|
addrs: Some(addrs),
|
||||||
stream: None,
|
stream: None,
|
||||||
},
|
},
|
||||||
@@ -116,6 +129,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||||||
TcpConnectorResponse::Response {
|
TcpConnectorResponse::Response {
|
||||||
req,
|
req,
|
||||||
port,
|
port,
|
||||||
|
local_addr,
|
||||||
addrs,
|
addrs,
|
||||||
stream,
|
stream,
|
||||||
} => loop {
|
} => loop {
|
||||||
@@ -148,11 +162,38 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||||||
// try to connect
|
// try to connect
|
||||||
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
|
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
|
||||||
|
|
||||||
|
let fut = connect(addr, *local_addr);
|
||||||
match stream {
|
match stream {
|
||||||
Some(rbf) => rbf.set(TcpStream::connect(addr)),
|
Some(rbf) => rbf.set(fut),
|
||||||
None => *stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
|
None => *stream = Some(ReusableBoxFuture::new(fut)),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
|
||||||
|
// use local addr if connect asks for it.
|
||||||
|
match local_addr {
|
||||||
|
Some(ip_addr) => {
|
||||||
|
let socket = match ip_addr {
|
||||||
|
IpAddr::V4(ip_addr) => {
|
||||||
|
let socket = TcpSocket::new_v4()?;
|
||||||
|
let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
|
||||||
|
socket.bind(addr)?;
|
||||||
|
socket
|
||||||
|
}
|
||||||
|
IpAddr::V6(ip_addr) => {
|
||||||
|
let socket = TcpSocket::new_v6()?;
|
||||||
|
let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
|
||||||
|
socket.bind(addr)?;
|
||||||
|
socket
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
socket.connect(addr).await
|
||||||
|
}
|
||||||
|
|
||||||
|
None => TcpStream::connect(addr).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -1,4 +1,9 @@
|
|||||||
use std::io;
|
#![cfg(feature = "connect")]
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
io,
|
||||||
|
net::{IpAddr, Ipv4Addr},
|
||||||
|
};
|
||||||
|
|
||||||
use actix_codec::{BytesCodec, Framed};
|
use actix_codec::{BytesCodec, Framed};
|
||||||
use actix_rt::net::TcpStream;
|
use actix_rt::net::TcpStream;
|
||||||
@@ -9,7 +14,7 @@ use futures_util::sink::SinkExt;
|
|||||||
|
|
||||||
use actix_tls::connect::{self as actix_connect, Connect};
|
use actix_tls::connect::{self as actix_connect, Connect};
|
||||||
|
|
||||||
#[cfg(all(feature = "connect", feature = "openssl"))]
|
#[cfg(feature = "openssl")]
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_string() {
|
async fn test_string() {
|
||||||
let srv = TestServer::with(|| {
|
let srv = TestServer::with(|| {
|
||||||
@@ -125,3 +130,25 @@ async fn test_rustls_uri() {
|
|||||||
let con = conn.call(addr.into()).await.unwrap();
|
let con = conn.call(addr.into()).await.unwrap();
|
||||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn test_local_addr() {
|
||||||
|
let srv = TestServer::with(|| {
|
||||||
|
fn_service(|io: TcpStream| async {
|
||||||
|
let mut framed = Framed::new(io, BytesCodec);
|
||||||
|
framed.send(Bytes::from_static(b"test")).await?;
|
||||||
|
Ok::<_, io::Error>(())
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let conn = actix_connect::default_connector();
|
||||||
|
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
|
||||||
|
|
||||||
|
let (con, _) = conn
|
||||||
|
.call(Connect::with_addr("10", srv.addr()).set_local_addr(local))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_parts();
|
||||||
|
|
||||||
|
assert_eq!(con.local_addr().unwrap().ip(), local)
|
||||||
|
}
|
||||||
|
@@ -1,3 +1,5 @@
|
|||||||
|
#![cfg(feature = "connect")]
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
io,
|
io,
|
||||||
net::{Ipv4Addr, SocketAddr},
|
net::{Ipv4Addr, SocketAddr},
|
||||||
|
@@ -1,49 +1,30 @@
|
|||||||
use core::cell::UnsafeCell;
|
use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
|
||||||
use core::fmt;
|
|
||||||
use core::marker::PhantomData;
|
|
||||||
use core::task::Waker;
|
|
||||||
|
|
||||||
/// A synchronization primitive for task wakeup.
|
/// A synchronization primitive for task wakeup.
|
||||||
///
|
///
|
||||||
/// Sometimes the task interested in a given event will change over time.
|
/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can
|
||||||
/// An `LocalWaker` can coordinate concurrent notifications with the consumer
|
/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying
|
||||||
/// potentially "updating" the underlying task to wake up. This is useful in
|
/// task to wake up. This is useful in scenarios where a computation completes in another task and
|
||||||
/// scenarios where a computation completes in another task and wants to
|
/// wants to notify the consumer, but the consumer is in the process of being migrated to a new
|
||||||
/// notify the consumer, but the consumer is in the process of being migrated to
|
/// logical task.
|
||||||
/// a new logical task.
|
|
||||||
///
|
///
|
||||||
/// Consumers should call `register` before checking the result of a computation
|
/// Consumers should call [`register`] before checking the result of a computation and producers
|
||||||
/// and producers should call `wake` after producing the computation (this
|
/// should call `wake` after producing the computation (this differs from the usual `thread::park`
|
||||||
/// differs from the usual `thread::park` pattern). It is also permitted for
|
/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in
|
||||||
/// `wake` to be called **before** `register`. This results in a no-op.
|
/// a no-op.
|
||||||
///
|
///
|
||||||
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
|
/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`].
|
||||||
/// `wake`.
|
|
||||||
// TODO: Refactor to Cell when remove deprecated methods (@botika)
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct LocalWaker {
|
pub struct LocalWaker {
|
||||||
pub(crate) waker: UnsafeCell<Option<Waker>>,
|
pub(crate) waker: Cell<Option<Waker>>,
|
||||||
// mark LocalWaker as a !Send type.
|
// mark LocalWaker as a !Send type.
|
||||||
_t: PhantomData<*const ()>,
|
_phantom: PhantomData<*const ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalWaker {
|
impl LocalWaker {
|
||||||
/// Create an `LocalWaker`.
|
/// Creates a new, empty `LocalWaker`.
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
LocalWaker {
|
LocalWaker::default()
|
||||||
waker: UnsafeCell::new(None),
|
|
||||||
_t: PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[deprecated(
|
|
||||||
since = "2.1.0",
|
|
||||||
note = "In favor of `wake`. State of the register doesn't matter at `wake` up"
|
|
||||||
)]
|
|
||||||
/// Check if waker has been registered.
|
|
||||||
#[inline]
|
|
||||||
pub fn is_registered(&self) -> bool {
|
|
||||||
unsafe { (*self.waker.get()).is_some() }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers the waker to be notified on calls to `wake`.
|
/// Registers the waker to be notified on calls to `wake`.
|
||||||
@@ -51,11 +32,8 @@ impl LocalWaker {
|
|||||||
/// Returns `true` if waker was registered before.
|
/// Returns `true` if waker was registered before.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn register(&self, waker: &Waker) -> bool {
|
pub fn register(&self, waker: &Waker) -> bool {
|
||||||
unsafe {
|
let last_waker = self.waker.replace(Some(waker.clone()));
|
||||||
let w = self.waker.get();
|
last_waker.is_some()
|
||||||
let last_waker = w.replace(Some(waker.clone()));
|
|
||||||
last_waker.is_some()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calls `wake` on the last `Waker` passed to `register`.
|
/// Calls `wake` on the last `Waker` passed to `register`.
|
||||||
@@ -73,7 +51,7 @@ impl LocalWaker {
|
|||||||
/// If a waker has not been registered, this returns `None`.
|
/// If a waker has not been registered, this returns `None`.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn take(&self) -> Option<Waker> {
|
pub fn take(&self) -> Option<Waker> {
|
||||||
unsafe { (*self.waker.get()).take() }
|
self.waker.take()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,5 +1,2 @@
|
|||||||
max_width = 96
|
max_width = 96
|
||||||
reorder_imports = true
|
reorder_imports = true
|
||||||
#wrap_comments = true
|
|
||||||
#fn_args_density = "Compressed"
|
|
||||||
#use_small_heuristics = false
|
|
||||||
|
Reference in New Issue
Block a user